|
@@ -20,13 +20,20 @@
|
|
#include "util.h"
|
|
#include "util.h"
|
|
#include "channel.h"
|
|
#include "channel.h"
|
|
|
|
|
|
|
|
+#include <boost/asio.hpp>
|
|
|
|
+#include <boost/date_time/posix_time/posix_time.hpp>
|
|
|
|
+
|
|
#include <log4cpp/Category.hh>
|
|
#include <log4cpp/Category.hh>
|
|
|
|
|
|
static log4cpp::Category &LOG = log4cpp::Category::getInstance("hedwig."__FILE__);
|
|
static log4cpp::Category &LOG = log4cpp::Category::getInstance("hedwig."__FILE__);
|
|
-const int SUBSCRIBER_RECONNECT_TIME = 3000; // 3 seconds
|
|
|
|
|
|
+
|
|
using namespace Hedwig;
|
|
using namespace Hedwig;
|
|
|
|
+const int DEFAULT_MESSAGE_CONSUME_RETRY_WAIT_TIME = 5000;
|
|
|
|
+const int DEFAULT_SUBSCRIBER_CONSUME_RETRY_WAIT_TIME = 5000;
|
|
|
|
+const int DEFAULT_MAX_MESSAGE_QUEUE_SIZE = 10;
|
|
|
|
+const int DEFAULT_RECONNECT_SUBSCRIBE_RETRY_WAIT_TIME = 5000;
|
|
|
|
|
|
-SubscriberWriteCallback::SubscriberWriteCallback(ClientImplPtr& client, const PubSubDataPtr& data) : client(client), data(data) {}
|
|
|
|
|
|
+SubscriberWriteCallback::SubscriberWriteCallback(const ClientImplPtr& client, const PubSubDataPtr& data) : client(client), data(data) {}
|
|
|
|
|
|
void SubscriberWriteCallback::operationComplete() {
|
|
void SubscriberWriteCallback::operationComplete() {
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
@@ -35,29 +42,43 @@ void SubscriberWriteCallback::operationComplete() {
|
|
}
|
|
}
|
|
|
|
|
|
void SubscriberWriteCallback::operationFailed(const std::exception& exception) {
|
|
void SubscriberWriteCallback::operationFailed(const std::exception& exception) {
|
|
- LOG.errorStream() << "Error writing to publisher " << exception.what();
|
|
|
|
|
|
+ LOG.errorStream() << "Error writing to subscriber " << exception.what();
|
|
|
|
|
|
//remove txn from channel pending list
|
|
//remove txn from channel pending list
|
|
- #warning "Actually do something here"
|
|
|
|
|
|
+ data->getCallback()->operationFailed(exception);
|
|
|
|
+ client->getSubscriberImpl().closeSubscription(data->getTopic(), data->getSubscriberId());
|
|
}
|
|
}
|
|
|
|
|
|
-UnsubscribeWriteCallback::UnsubscribeWriteCallback(ClientImplPtr& client, const PubSubDataPtr& data) : client(client), data(data) {}
|
|
|
|
|
|
+UnsubscribeWriteCallback::UnsubscribeWriteCallback(const ClientImplPtr& client, const PubSubDataPtr& data) : client(client), data(data) {}
|
|
|
|
|
|
void UnsubscribeWriteCallback::operationComplete() {
|
|
void UnsubscribeWriteCallback::operationComplete() {
|
|
-
|
|
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debugStream() << "Successfully wrote unsubscribe transaction: " << data->getTxnId();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
void UnsubscribeWriteCallback::operationFailed(const std::exception& exception) {
|
|
void UnsubscribeWriteCallback::operationFailed(const std::exception& exception) {
|
|
- #warning "Actually do something here"
|
|
|
|
|
|
+ data->getCallback()->operationFailed(exception);
|
|
}
|
|
}
|
|
|
|
|
|
-ConsumeWriteCallback::ConsumeWriteCallback(const PubSubDataPtr& data)
|
|
|
|
- : data(data) {
|
|
|
|
|
|
+ConsumeWriteCallback::ConsumeWriteCallback(const ClientImplPtr& client, const PubSubDataPtr& data)
|
|
|
|
+ : client(client), data(data) {
|
|
}
|
|
}
|
|
|
|
|
|
ConsumeWriteCallback::~ConsumeWriteCallback() {
|
|
ConsumeWriteCallback::~ConsumeWriteCallback() {
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+/* static */ void ConsumeWriteCallback::timerComplete(const ClientImplPtr& client, const PubSubDataPtr& data,
|
|
|
|
+ const boost::system::error_code& error) {
|
|
|
|
+ if (error) {
|
|
|
|
+ // shutting down
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ client->getSubscriberImpl().consume(data->getTopic(), data->getSubscriberId(), data->getMessageSeqId());
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+
|
|
void ConsumeWriteCallback::operationComplete() {
|
|
void ConsumeWriteCallback::operationComplete() {
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
LOG.debugStream() << "Successfully wrote consume transaction: " << data->getTxnId();
|
|
LOG.debugStream() << "Successfully wrote consume transaction: " << data->getTxnId();
|
|
@@ -65,24 +86,54 @@ void ConsumeWriteCallback::operationComplete() {
|
|
}
|
|
}
|
|
|
|
|
|
void ConsumeWriteCallback::operationFailed(const std::exception& exception) {
|
|
void ConsumeWriteCallback::operationFailed(const std::exception& exception) {
|
|
- LOG.errorStream() << "Error writing consume transaction: " << data->getTxnId() << " error: " << exception.what();
|
|
|
|
|
|
+ int retrywait = client->getConfiguration().getInt(Configuration::MESSAGE_CONSUME_RETRY_WAIT_TIME,
|
|
|
|
+ DEFAULT_MESSAGE_CONSUME_RETRY_WAIT_TIME);
|
|
|
|
+ LOG.errorStream() << "Error writing consume transaction: " << data->getTxnId() << " error: " << exception.what()
|
|
|
|
+ << " retrying in " << retrywait << " Microseconds";
|
|
|
|
+
|
|
|
|
+ boost::asio::deadline_timer t(client->getService(), boost::posix_time::milliseconds(retrywait));
|
|
|
|
+
|
|
|
|
+ t.async_wait(boost::bind(&ConsumeWriteCallback::timerComplete, client, data, boost::asio::placeholders::error));
|
|
}
|
|
}
|
|
|
|
|
|
-SubscriberConsumeCallback::SubscriberConsumeCallback(ClientImplPtr& client, const std::string& topic, const std::string& subscriberid, const MessageSeqId& msgid)
|
|
|
|
- : client(client), topic(topic), subscriberid(subscriberid), msgid(msgid)
|
|
|
|
|
|
+SubscriberConsumeCallback::SubscriberConsumeCallback(const ClientImplPtr& client,
|
|
|
|
+ const SubscriberClientChannelHandlerPtr& handler,
|
|
|
|
+ const PubSubDataPtr& data, const PubSubResponsePtr& m)
|
|
|
|
+ : client(client), handler(handler), data(data), m(m)
|
|
{
|
|
{
|
|
}
|
|
}
|
|
|
|
|
|
void SubscriberConsumeCallback::operationComplete() {
|
|
void SubscriberConsumeCallback::operationComplete() {
|
|
- LOG.errorStream() << "ConsumeCallback::operationComplete";
|
|
|
|
- client->getSubscriber().consume(topic, subscriberid, msgid);
|
|
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debugStream() << "ConsumeCallback::operationComplete " << data->getTopic() << " - " << data->getSubscriberId();
|
|
|
|
+ };
|
|
|
|
+ client->getSubscriber().consume(data->getTopic(), data->getSubscriberId(), m->message().msgid());
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+/* static */ void SubscriberConsumeCallback::timerComplete(const SubscriberClientChannelHandlerPtr handler,
|
|
|
|
+ const PubSubResponsePtr m,
|
|
|
|
+ const boost::system::error_code& error) {
|
|
|
|
+ if (error) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ handler->messageReceived(handler->getChannel(), m);
|
|
}
|
|
}
|
|
|
|
|
|
void SubscriberConsumeCallback::operationFailed(const std::exception& exception) {
|
|
void SubscriberConsumeCallback::operationFailed(const std::exception& exception) {
|
|
- LOG.errorStream() << "ConsumeCallback::operationFailed";
|
|
|
|
|
|
+ LOG.errorStream() << "ConsumeCallback::operationFailed " << data->getTopic() << " - " << data->getSubscriberId();
|
|
|
|
+
|
|
|
|
+ int retrywait = client->getConfiguration().getInt(Configuration::SUBSCRIBER_CONSUME_RETRY_WAIT_TIME,
|
|
|
|
+ DEFAULT_SUBSCRIBER_CONSUME_RETRY_WAIT_TIME);
|
|
|
|
+
|
|
|
|
+ LOG.errorStream() << "Error passing message to client transaction: " << data->getTxnId() << " error: " << exception.what()
|
|
|
|
+ << " retrying in " << retrywait << " Microseconds";
|
|
|
|
+
|
|
|
|
+ boost::asio::deadline_timer t(client->getService(), boost::posix_time::milliseconds(retrywait));
|
|
|
|
+
|
|
|
|
+ t.async_wait(boost::bind(&SubscriberConsumeCallback::timerComplete, handler, m, boost::asio::placeholders::error));
|
|
}
|
|
}
|
|
|
|
|
|
-SubscriberReconnectCallback::SubscriberReconnectCallback(ClientImplPtr& client, const PubSubDataPtr& origData)
|
|
|
|
|
|
+SubscriberReconnectCallback::SubscriberReconnectCallback(const ClientImplPtr& client, const PubSubDataPtr& origData)
|
|
: client(client), origData(origData) {
|
|
: client(client), origData(origData) {
|
|
}
|
|
}
|
|
|
|
|
|
@@ -90,11 +141,13 @@ void SubscriberReconnectCallback::operationComplete() {
|
|
}
|
|
}
|
|
|
|
|
|
void SubscriberReconnectCallback::operationFailed(const std::exception& exception) {
|
|
void SubscriberReconnectCallback::operationFailed(const std::exception& exception) {
|
|
-
|
|
|
|
|
|
+ LOG.errorStream() << "Error writing to new subscriber. Channel should pick this up disconnect the channel and try to connect again " << exception.what();
|
|
|
|
+
|
|
|
|
+
|
|
}
|
|
}
|
|
|
|
|
|
-SubscriberClientChannelHandler::SubscriberClientChannelHandler(ClientImplPtr& client, SubscriberImpl& subscriber, const PubSubDataPtr& data)
|
|
|
|
- : HedwigClientChannelHandler(client), subscriber(subscriber), origData(data), closed(false) {
|
|
|
|
|
|
+SubscriberClientChannelHandler::SubscriberClientChannelHandler(const ClientImplPtr& client, SubscriberImpl& subscriber, const PubSubDataPtr& data)
|
|
|
|
+ : HedwigClientChannelHandler(client), subscriber(subscriber), origData(data), closed(false), should_wait(true) {
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
LOG.debugStream() << "Creating SubscriberClientChannelHandler " << this;
|
|
LOG.debugStream() << "Creating SubscriberClientChannelHandler " << this;
|
|
}
|
|
}
|
|
@@ -106,18 +159,21 @@ SubscriberClientChannelHandler::~SubscriberClientChannelHandler() {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-void SubscriberClientChannelHandler::messageReceived(DuplexChannel* channel, const PubSubResponse& m) {
|
|
|
|
- if (m.has_message()) {
|
|
|
|
|
|
+void SubscriberClientChannelHandler::messageReceived(const DuplexChannelPtr& channel, const PubSubResponsePtr& m) {
|
|
|
|
+ if (m->has_message()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
LOG.debugStream() << "Message received (topic:" << origData->getTopic() << ", subscriberId:" << origData->getSubscriberId() << ")";
|
|
LOG.debugStream() << "Message received (topic:" << origData->getTopic() << ", subscriberId:" << origData->getSubscriberId() << ")";
|
|
}
|
|
}
|
|
|
|
|
|
if (this->handler.get()) {
|
|
if (this->handler.get()) {
|
|
- OperationCallbackPtr callback(new SubscriberConsumeCallback(client, origData->getTopic(), origData->getSubscriberId(), m.message().msgid()));
|
|
|
|
- this->handler->consume(origData->getTopic(), origData->getSubscriberId(), m.message(), callback);
|
|
|
|
|
|
+ OperationCallbackPtr callback(new SubscriberConsumeCallback(client, shared_from_this(), origData, m));
|
|
|
|
+ this->handler->consume(origData->getTopic(), origData->getSubscriberId(), m->message(), callback);
|
|
} else {
|
|
} else {
|
|
- LOG.debugStream() << "putting in queue";
|
|
|
|
- queue.push_back(m.message());
|
|
|
|
|
|
+ queue.push_back(m);
|
|
|
|
+ if (queue.size() >= (std::size_t)client->getConfiguration().getInt(Configuration::MAX_MESSAGE_QUEUE_SIZE,
|
|
|
|
+ DEFAULT_MAX_MESSAGE_QUEUE_SIZE)) {
|
|
|
|
+ channel->stopReceiving();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
HedwigClientChannelHandler::messageReceived(channel, m);
|
|
HedwigClientChannelHandler::messageReceived(channel, m);
|
|
@@ -126,12 +182,23 @@ void SubscriberClientChannelHandler::messageReceived(DuplexChannel* channel, con
|
|
|
|
|
|
void SubscriberClientChannelHandler::close() {
|
|
void SubscriberClientChannelHandler::close() {
|
|
closed = true;
|
|
closed = true;
|
|
|
|
+
|
|
if (channel) {
|
|
if (channel) {
|
|
channel->kill();
|
|
channel->kill();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-void SubscriberClientChannelHandler::channelDisconnected(DuplexChannel* channel, const std::exception& e) {
|
|
|
|
|
|
+/*static*/ void SubscriberClientChannelHandler::reconnectTimerComplete(const SubscriberClientChannelHandlerPtr handler,
|
|
|
|
+ const DuplexChannelPtr channel, const std::exception e,
|
|
|
|
+ const boost::system::error_code& error) {
|
|
|
|
+ if (error) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ handler->should_wait = false;
|
|
|
|
+ handler->channelDisconnected(channel, e);
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+void SubscriberClientChannelHandler::channelDisconnected(const DuplexChannelPtr& channel, const std::exception& e) {
|
|
// has subscription been closed
|
|
// has subscription been closed
|
|
if (closed) {
|
|
if (closed) {
|
|
return;
|
|
return;
|
|
@@ -142,59 +209,60 @@ void SubscriberClientChannelHandler::channelDisconnected(DuplexChannel* channel,
|
|
if (client->shuttingDown()) {
|
|
if (client->shuttingDown()) {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ if (should_wait) {
|
|
|
|
+ int retrywait = client->getConfiguration().getInt(Configuration::RECONNECT_SUBSCRIBE_RETRY_WAIT_TIME,
|
|
|
|
+ DEFAULT_RECONNECT_SUBSCRIBE_RETRY_WAIT_TIME);
|
|
|
|
+
|
|
|
|
+ boost::asio::deadline_timer t(client->getService(), boost::posix_time::milliseconds(retrywait));
|
|
|
|
+ t.async_wait(boost::bind(&SubscriberClientChannelHandler::reconnectTimerComplete, shared_from_this(),
|
|
|
|
+ channel, e, boost::asio::placeholders::error));
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+ should_wait = true;
|
|
|
|
+
|
|
// setup pubsub data for reconnection attempt
|
|
// setup pubsub data for reconnection attempt
|
|
origData->clearTriedServers();
|
|
origData->clearTriedServers();
|
|
OperationCallbackPtr newcallback(new SubscriberReconnectCallback(client, origData));
|
|
OperationCallbackPtr newcallback(new SubscriberReconnectCallback(client, origData));
|
|
origData->setCallback(newcallback);
|
|
origData->setCallback(newcallback);
|
|
|
|
|
|
// Create a new handler for the new channel
|
|
// Create a new handler for the new channel
|
|
- SubscriberClientChannelHandlerPtr handler(new SubscriberClientChannelHandler(client, subscriber, origData));
|
|
|
|
- ChannelHandlerPtr baseptr = handler;
|
|
|
|
- // if there is an error createing the channel, sleep for SUBSCRIBER_RECONNECT_TIME and try again
|
|
|
|
- DuplexChannelPtr newchannel;
|
|
|
|
- while (true) {
|
|
|
|
- try {
|
|
|
|
- newchannel = client->createChannelForTopic(origData->getTopic(), baseptr);
|
|
|
|
- handler->setChannel(newchannel);
|
|
|
|
- break;
|
|
|
|
- } catch (ShuttingDownException& e) {
|
|
|
|
- LOG.errorStream() << "Shutting down, don't try to reconnect";
|
|
|
|
- return;
|
|
|
|
- } catch (ChannelException& e) {
|
|
|
|
- LOG.errorStream() << "Couldn't acquire channel, sleeping for " << SUBSCRIBER_RECONNECT_TIME << " before trying again";
|
|
|
|
- usleep(SUBSCRIBER_RECONNECT_TIME);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- handoverDelivery(handler.get());
|
|
|
|
|
|
+ SubscriberClientChannelHandlerPtr newhandler(new SubscriberClientChannelHandler(client, subscriber, origData));
|
|
|
|
+ ChannelHandlerPtr baseptr = newhandler;
|
|
|
|
+
|
|
|
|
+ DuplexChannelPtr newchannel = client->createChannel(origData->getTopic(), baseptr);
|
|
|
|
+ newhandler->setChannel(newchannel);
|
|
|
|
+ handoverDelivery(newhandler);
|
|
|
|
|
|
// remove record of the failed channel from the subscriber
|
|
// remove record of the failed channel from the subscriber
|
|
- subscriber.closeSubscription(origData->getTopic(), origData->getSubscriberId());
|
|
|
|
-
|
|
|
|
|
|
+ client->getSubscriberImpl().closeSubscription(origData->getTopic(), origData->getSubscriberId());
|
|
|
|
+
|
|
// subscriber
|
|
// subscriber
|
|
- subscriber.doSubscribe(newchannel, origData, handler);
|
|
|
|
|
|
+ client->getSubscriberImpl().doSubscribe(newchannel, origData, newhandler);
|
|
}
|
|
}
|
|
|
|
|
|
void SubscriberClientChannelHandler::startDelivery(const MessageHandlerCallbackPtr& handler) {
|
|
void SubscriberClientChannelHandler::startDelivery(const MessageHandlerCallbackPtr& handler) {
|
|
this->handler = handler;
|
|
this->handler = handler;
|
|
|
|
|
|
while (!queue.empty()) {
|
|
while (!queue.empty()) {
|
|
- LOG.debugStream() << "Taking from queue";
|
|
|
|
- Message m = queue.front();
|
|
|
|
|
|
+ PubSubResponsePtr m = queue.front();
|
|
queue.pop_front();
|
|
queue.pop_front();
|
|
|
|
|
|
- OperationCallbackPtr callback(new SubscriberConsumeCallback(client, origData->getTopic(), origData->getSubscriberId(), m.msgid()));
|
|
|
|
|
|
+ OperationCallbackPtr callback(new SubscriberConsumeCallback(client, shared_from_this(), origData, m));
|
|
|
|
|
|
- this->handler->consume(origData->getTopic(), origData->getSubscriberId(), m, callback);
|
|
|
|
|
|
+ this->handler->consume(origData->getTopic(), origData->getSubscriberId(), m->message(), callback);
|
|
}
|
|
}
|
|
|
|
+ channel->startReceiving();
|
|
}
|
|
}
|
|
|
|
|
|
void SubscriberClientChannelHandler::stopDelivery() {
|
|
void SubscriberClientChannelHandler::stopDelivery() {
|
|
|
|
+ channel->stopReceiving();
|
|
|
|
+
|
|
this->handler = MessageHandlerCallbackPtr();
|
|
this->handler = MessageHandlerCallbackPtr();
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
-void SubscriberClientChannelHandler::handoverDelivery(SubscriberClientChannelHandler* newHandler) {
|
|
|
|
|
|
+void SubscriberClientChannelHandler::handoverDelivery(const SubscriberClientChannelHandlerPtr& newHandler) {
|
|
LOG.debugStream() << "Messages in queue " << queue.size();
|
|
LOG.debugStream() << "Messages in queue " << queue.size();
|
|
MessageHandlerCallbackPtr handler = this->handler;
|
|
MessageHandlerCallbackPtr handler = this->handler;
|
|
stopDelivery(); // resets old handler
|
|
stopDelivery(); // resets old handler
|
|
@@ -209,7 +277,7 @@ DuplexChannelPtr& SubscriberClientChannelHandler::getChannel() {
|
|
return channel;
|
|
return channel;
|
|
}
|
|
}
|
|
|
|
|
|
-SubscriberImpl::SubscriberImpl(ClientImplPtr& client)
|
|
|
|
|
|
+SubscriberImpl::SubscriberImpl(const ClientImplPtr& client)
|
|
: client(client)
|
|
: client(client)
|
|
{
|
|
{
|
|
}
|
|
}
|
|
@@ -221,7 +289,8 @@ SubscriberImpl::~SubscriberImpl()
|
|
|
|
|
|
|
|
|
|
void SubscriberImpl::subscribe(const std::string& topic, const std::string& subscriberId, const SubscribeRequest::CreateOrAttach mode) {
|
|
void SubscriberImpl::subscribe(const std::string& topic, const std::string& subscriberId, const SubscribeRequest::CreateOrAttach mode) {
|
|
- SyncOperationCallback* cb = new SyncOperationCallback();
|
|
|
|
|
|
+ SyncOperationCallback* cb = new SyncOperationCallback(client->getConfiguration().getInt(Configuration::SYNC_REQUEST_TIMEOUT,
|
|
|
|
+ DEFAULT_SYNC_REQUEST_TIMEOUT));
|
|
OperationCallbackPtr callback(cb);
|
|
OperationCallbackPtr callback(cb);
|
|
asyncSubscribe(topic, subscriberId, mode, callback);
|
|
asyncSubscribe(topic, subscriberId, mode, callback);
|
|
cb->wait();
|
|
cb->wait();
|
|
@@ -232,37 +301,35 @@ void SubscriberImpl::subscribe(const std::string& topic, const std::string& subs
|
|
void SubscriberImpl::asyncSubscribe(const std::string& topic, const std::string& subscriberId, const SubscribeRequest::CreateOrAttach mode, const OperationCallbackPtr& callback) {
|
|
void SubscriberImpl::asyncSubscribe(const std::string& topic, const std::string& subscriberId, const SubscribeRequest::CreateOrAttach mode, const OperationCallbackPtr& callback) {
|
|
PubSubDataPtr data = PubSubData::forSubscribeRequest(client->counter().next(), subscriberId, topic, callback, mode);
|
|
PubSubDataPtr data = PubSubData::forSubscribeRequest(client->counter().next(), subscriberId, topic, callback, mode);
|
|
|
|
|
|
- SubscriberClientChannelHandlerPtr handler(new SubscriberClientChannelHandler(client, *this, data));
|
|
|
|
|
|
+ SubscriberClientChannelHandlerPtr handler(new SubscriberClientChannelHandler(client, *this, data));
|
|
ChannelHandlerPtr baseptr = handler;
|
|
ChannelHandlerPtr baseptr = handler;
|
|
- DuplexChannelPtr channel = client->createChannelForTopic(topic, baseptr);
|
|
|
|
-
|
|
|
|
- handler->setChannel(channel);
|
|
|
|
|
|
|
|
|
|
+ DuplexChannelPtr channel = client->createChannel(topic, handler);
|
|
|
|
+ handler->setChannel(channel);
|
|
doSubscribe(channel, data, handler);
|
|
doSubscribe(channel, data, handler);
|
|
}
|
|
}
|
|
|
|
|
|
void SubscriberImpl::doSubscribe(const DuplexChannelPtr& channel, const PubSubDataPtr& data, const SubscriberClientChannelHandlerPtr& handler) {
|
|
void SubscriberImpl::doSubscribe(const DuplexChannelPtr& channel, const PubSubDataPtr& data, const SubscriberClientChannelHandlerPtr& handler) {
|
|
- LOG.debugStream() << "doSubscribe";
|
|
|
|
channel->storeTransaction(data);
|
|
channel->storeTransaction(data);
|
|
|
|
|
|
OperationCallbackPtr writecb(new SubscriberWriteCallback(client, data));
|
|
OperationCallbackPtr writecb(new SubscriberWriteCallback(client, data));
|
|
channel->writeRequest(data->getRequest(), writecb);
|
|
channel->writeRequest(data->getRequest(), writecb);
|
|
|
|
|
|
- topicsubscriber2handler_lock.lock();
|
|
|
|
|
|
+ boost::lock_guard<boost::shared_mutex> lock(topicsubscriber2handler_lock);
|
|
TopicSubscriber t(data->getTopic(), data->getSubscriberId());
|
|
TopicSubscriber t(data->getTopic(), data->getSubscriberId());
|
|
SubscriberClientChannelHandlerPtr oldhandler = topicsubscriber2handler[t];
|
|
SubscriberClientChannelHandlerPtr oldhandler = topicsubscriber2handler[t];
|
|
if (oldhandler != NULL) {
|
|
if (oldhandler != NULL) {
|
|
- oldhandler->handoverDelivery(handler.get());
|
|
|
|
|
|
+ oldhandler->handoverDelivery(handler);
|
|
}
|
|
}
|
|
topicsubscriber2handler[t] = handler;
|
|
topicsubscriber2handler[t] = handler;
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
LOG.debugStream() << "Set topic subscriber for topic(" << data->getTopic() << ") subscriberId(" << data->getSubscriberId() << ") to " << handler.get() << " topicsubscriber2topic(" << &topicsubscriber2handler << ")";
|
|
LOG.debugStream() << "Set topic subscriber for topic(" << data->getTopic() << ") subscriberId(" << data->getSubscriberId() << ") to " << handler.get() << " topicsubscriber2topic(" << &topicsubscriber2handler << ")";
|
|
}
|
|
}
|
|
- topicsubscriber2handler_lock.unlock();;
|
|
|
|
}
|
|
}
|
|
|
|
|
|
void SubscriberImpl::unsubscribe(const std::string& topic, const std::string& subscriberId) {
|
|
void SubscriberImpl::unsubscribe(const std::string& topic, const std::string& subscriberId) {
|
|
- SyncOperationCallback* cb = new SyncOperationCallback();
|
|
|
|
|
|
+ SyncOperationCallback* cb = new SyncOperationCallback(client->getConfiguration().getInt(Configuration::SYNC_REQUEST_TIMEOUT,
|
|
|
|
+ DEFAULT_SYNC_REQUEST_TIMEOUT));
|
|
OperationCallbackPtr callback(cb);
|
|
OperationCallbackPtr callback(cb);
|
|
asyncUnsubscribe(topic, subscriberId, callback);
|
|
asyncUnsubscribe(topic, subscriberId, callback);
|
|
cb->wait();
|
|
cb->wait();
|
|
@@ -275,14 +342,8 @@ void SubscriberImpl::asyncUnsubscribe(const std::string& topic, const std::strin
|
|
|
|
|
|
PubSubDataPtr data = PubSubData::forUnsubscribeRequest(client->counter().next(), subscriberId, topic, callback);
|
|
PubSubDataPtr data = PubSubData::forUnsubscribeRequest(client->counter().next(), subscriberId, topic, callback);
|
|
|
|
|
|
- DuplexChannelPtr channel = client->getChannelForTopic(topic);
|
|
|
|
- if (channel.get() == 0) {
|
|
|
|
- LOG.errorStream() << "Trying to unsubscribe from (" << topic << ", " << subscriberId << ") but channel is dead";
|
|
|
|
- callback->operationFailed(InvalidStateException());
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- doUnsubscribe(channel, data);
|
|
|
|
|
|
+ DuplexChannelPtr channel = client->getChannel(topic);
|
|
|
|
+ doUnsubscribe(channel, data);
|
|
}
|
|
}
|
|
|
|
|
|
void SubscriberImpl::doUnsubscribe(const DuplexChannelPtr& channel, const PubSubDataPtr& data) {
|
|
void SubscriberImpl::doUnsubscribe(const DuplexChannelPtr& channel, const PubSubDataPtr& data) {
|
|
@@ -293,10 +354,9 @@ void SubscriberImpl::doUnsubscribe(const DuplexChannelPtr& channel, const PubSub
|
|
|
|
|
|
void SubscriberImpl::consume(const std::string& topic, const std::string& subscriberId, const MessageSeqId& messageSeqId) {
|
|
void SubscriberImpl::consume(const std::string& topic, const std::string& subscriberId, const MessageSeqId& messageSeqId) {
|
|
TopicSubscriber t(topic, subscriberId);
|
|
TopicSubscriber t(topic, subscriberId);
|
|
-
|
|
|
|
- topicsubscriber2handler_lock.lock();
|
|
|
|
|
|
+
|
|
|
|
+ boost::shared_lock<boost::shared_mutex> lock(topicsubscriber2handler_lock);
|
|
SubscriberClientChannelHandlerPtr handler = topicsubscriber2handler[t];
|
|
SubscriberClientChannelHandlerPtr handler = topicsubscriber2handler[t];
|
|
- topicsubscriber2handler_lock.unlock();
|
|
|
|
|
|
|
|
if (handler.get() == 0) {
|
|
if (handler.get() == 0) {
|
|
LOG.errorStream() << "Cannot consume. Bad handler for topic(" << topic << ") subscriberId(" << subscriberId << ") topicsubscriber2topic(" << &topicsubscriber2handler << ")";
|
|
LOG.errorStream() << "Cannot consume. Bad handler for topic(" << topic << ") subscriberId(" << subscriberId << ") topicsubscriber2topic(" << &topicsubscriber2handler << ")";
|
|
@@ -309,16 +369,15 @@ void SubscriberImpl::consume(const std::string& topic, const std::string& subscr
|
|
}
|
|
}
|
|
|
|
|
|
PubSubDataPtr data = PubSubData::forConsumeRequest(client->counter().next(), subscriberId, topic, messageSeqId);
|
|
PubSubDataPtr data = PubSubData::forConsumeRequest(client->counter().next(), subscriberId, topic, messageSeqId);
|
|
- OperationCallbackPtr writecb(new ConsumeWriteCallback(data));
|
|
|
|
|
|
+ OperationCallbackPtr writecb(new ConsumeWriteCallback(client, data));
|
|
channel->writeRequest(data->getRequest(), writecb);
|
|
channel->writeRequest(data->getRequest(), writecb);
|
|
}
|
|
}
|
|
|
|
|
|
void SubscriberImpl::startDelivery(const std::string& topic, const std::string& subscriberId, const MessageHandlerCallbackPtr& callback) {
|
|
void SubscriberImpl::startDelivery(const std::string& topic, const std::string& subscriberId, const MessageHandlerCallbackPtr& callback) {
|
|
TopicSubscriber t(topic, subscriberId);
|
|
TopicSubscriber t(topic, subscriberId);
|
|
|
|
|
|
- topicsubscriber2handler_lock.lock();
|
|
|
|
|
|
+ boost::shared_lock<boost::shared_mutex> lock(topicsubscriber2handler_lock);
|
|
SubscriberClientChannelHandlerPtr handler = topicsubscriber2handler[t];
|
|
SubscriberClientChannelHandlerPtr handler = topicsubscriber2handler[t];
|
|
- topicsubscriber2handler_lock.unlock();
|
|
|
|
|
|
|
|
if (handler.get() == 0) {
|
|
if (handler.get() == 0) {
|
|
LOG.errorStream() << "Trying to start deliver on a non existant handler topic = " << topic << ", subscriber = " << subscriberId;
|
|
LOG.errorStream() << "Trying to start deliver on a non existant handler topic = " << topic << ", subscriber = " << subscriberId;
|
|
@@ -328,10 +387,9 @@ void SubscriberImpl::startDelivery(const std::string& topic, const std::string&
|
|
|
|
|
|
void SubscriberImpl::stopDelivery(const std::string& topic, const std::string& subscriberId) {
|
|
void SubscriberImpl::stopDelivery(const std::string& topic, const std::string& subscriberId) {
|
|
TopicSubscriber t(topic, subscriberId);
|
|
TopicSubscriber t(topic, subscriberId);
|
|
-
|
|
|
|
- topicsubscriber2handler_lock.lock();
|
|
|
|
|
|
+
|
|
|
|
+ boost::shared_lock<boost::shared_mutex> lock(topicsubscriber2handler_lock);
|
|
SubscriberClientChannelHandlerPtr handler = topicsubscriber2handler[t];
|
|
SubscriberClientChannelHandlerPtr handler = topicsubscriber2handler[t];
|
|
- topicsubscriber2handler_lock.unlock();
|
|
|
|
|
|
|
|
if (handler.get() == 0) {
|
|
if (handler.get() == 0) {
|
|
LOG.errorStream() << "Trying to start deliver on a non existant handler topic = " << topic << ", subscriber = " << subscriberId;
|
|
LOG.errorStream() << "Trying to start deliver on a non existant handler topic = " << topic << ", subscriber = " << subscriberId;
|
|
@@ -345,11 +403,14 @@ void SubscriberImpl::closeSubscription(const std::string& topic, const std::stri
|
|
}
|
|
}
|
|
TopicSubscriber t(topic, subscriberId);
|
|
TopicSubscriber t(topic, subscriberId);
|
|
|
|
|
|
- topicsubscriber2handler_lock.lock();;
|
|
|
|
- SubscriberClientChannelHandlerPtr handler = topicsubscriber2handler[t];
|
|
|
|
- topicsubscriber2handler.erase(t);
|
|
|
|
- topicsubscriber2handler_lock.unlock();;
|
|
|
|
- if (handler) {
|
|
|
|
|
|
+ SubscriberClientChannelHandlerPtr handler;
|
|
|
|
+ {
|
|
|
|
+ boost::lock_guard<boost::shared_mutex> lock(topicsubscriber2handler_lock);
|
|
|
|
+ handler = topicsubscriber2handler[t];
|
|
|
|
+ topicsubscriber2handler.erase(t);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (handler.get() != 0) {
|
|
handler->close();
|
|
handler->close();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -357,16 +418,16 @@ void SubscriberImpl::closeSubscription(const std::string& topic, const std::stri
|
|
/**
|
|
/**
|
|
takes ownership of txn
|
|
takes ownership of txn
|
|
*/
|
|
*/
|
|
-void SubscriberImpl::messageHandler(const PubSubResponse& m, const PubSubDataPtr& txn) {
|
|
|
|
|
|
+void SubscriberImpl::messageHandler(const PubSubResponsePtr& m, const PubSubDataPtr& txn) {
|
|
if (!txn.get()) {
|
|
if (!txn.get()) {
|
|
LOG.errorStream() << "Invalid transaction";
|
|
LOG.errorStream() << "Invalid transaction";
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
- LOG.debugStream() << "message received with status " << m.statuscode();
|
|
|
|
|
|
+ LOG.debugStream() << "message received with status " << m->statuscode();
|
|
}
|
|
}
|
|
- switch (m.statuscode()) {
|
|
|
|
|
|
+ switch (m->statuscode()) {
|
|
case SUCCESS:
|
|
case SUCCESS:
|
|
txn->getCallback()->operationComplete();
|
|
txn->getCallback()->operationComplete();
|
|
break;
|
|
break;
|