|
@@ -23,9 +23,9 @@
|
|
#include <boost/asio.hpp>
|
|
#include <boost/asio.hpp>
|
|
#include <boost/date_time/posix_time/posix_time.hpp>
|
|
#include <boost/date_time/posix_time/posix_time.hpp>
|
|
|
|
|
|
-#include <log4cpp/Category.hh>
|
|
|
|
|
|
+#include <log4cxx/logger.h>
|
|
|
|
|
|
-static log4cpp::Category &LOG = log4cpp::Category::getInstance("hedwig."__FILE__);
|
|
|
|
|
|
+static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__));
|
|
|
|
|
|
using namespace Hedwig;
|
|
using namespace Hedwig;
|
|
const int DEFAULT_MESSAGE_CONSUME_RETRY_WAIT_TIME = 5000;
|
|
const int DEFAULT_MESSAGE_CONSUME_RETRY_WAIT_TIME = 5000;
|
|
@@ -36,13 +36,11 @@ const int DEFAULT_RECONNECT_SUBSCRIBE_RETRY_WAIT_TIME = 5000;
|
|
SubscriberWriteCallback::SubscriberWriteCallback(const ClientImplPtr& client, const PubSubDataPtr& data) : client(client), data(data) {}
|
|
SubscriberWriteCallback::SubscriberWriteCallback(const ClientImplPtr& client, const PubSubDataPtr& data) : client(client), data(data) {}
|
|
|
|
|
|
void SubscriberWriteCallback::operationComplete() {
|
|
void SubscriberWriteCallback::operationComplete() {
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debugStream() << "Successfully wrote subscribe transaction: " << data->getTxnId();
|
|
|
|
- }
|
|
|
|
|
|
+ LOG4CXX_DEBUG(logger, "Successfully wrote subscribe transaction: " << data->getTxnId());
|
|
}
|
|
}
|
|
|
|
|
|
void SubscriberWriteCallback::operationFailed(const std::exception& exception) {
|
|
void SubscriberWriteCallback::operationFailed(const std::exception& exception) {
|
|
- LOG.errorStream() << "Error writing to subscriber " << exception.what();
|
|
|
|
|
|
+ LOG4CXX_ERROR(logger, "Error writing to subscriber " << exception.what());
|
|
|
|
|
|
//remove txn from channel pending list
|
|
//remove txn from channel pending list
|
|
data->getCallback()->operationFailed(exception);
|
|
data->getCallback()->operationFailed(exception);
|
|
@@ -52,9 +50,7 @@ void SubscriberWriteCallback::operationFailed(const std::exception& exception) {
|
|
UnsubscribeWriteCallback::UnsubscribeWriteCallback(const ClientImplPtr& client, const PubSubDataPtr& data) : client(client), data(data) {}
|
|
UnsubscribeWriteCallback::UnsubscribeWriteCallback(const ClientImplPtr& client, const PubSubDataPtr& data) : client(client), data(data) {}
|
|
|
|
|
|
void UnsubscribeWriteCallback::operationComplete() {
|
|
void UnsubscribeWriteCallback::operationComplete() {
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debugStream() << "Successfully wrote unsubscribe transaction: " << data->getTxnId();
|
|
|
|
- }
|
|
|
|
|
|
+ LOG4CXX_DEBUG(logger, "Successfully wrote unsubscribe transaction: " << data->getTxnId());
|
|
}
|
|
}
|
|
|
|
|
|
void UnsubscribeWriteCallback::operationFailed(const std::exception& exception) {
|
|
void UnsubscribeWriteCallback::operationFailed(const std::exception& exception) {
|
|
@@ -80,16 +76,14 @@ ConsumeWriteCallback::~ConsumeWriteCallback() {
|
|
|
|
|
|
|
|
|
|
void ConsumeWriteCallback::operationComplete() {
|
|
void ConsumeWriteCallback::operationComplete() {
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debugStream() << "Successfully wrote consume transaction: " << data->getTxnId();
|
|
|
|
- }
|
|
|
|
|
|
+ LOG4CXX_DEBUG(logger, "Successfully wrote consume transaction: " << data->getTxnId());
|
|
}
|
|
}
|
|
|
|
|
|
void ConsumeWriteCallback::operationFailed(const std::exception& exception) {
|
|
void ConsumeWriteCallback::operationFailed(const std::exception& exception) {
|
|
int retrywait = client->getConfiguration().getInt(Configuration::MESSAGE_CONSUME_RETRY_WAIT_TIME,
|
|
int retrywait = client->getConfiguration().getInt(Configuration::MESSAGE_CONSUME_RETRY_WAIT_TIME,
|
|
DEFAULT_MESSAGE_CONSUME_RETRY_WAIT_TIME);
|
|
DEFAULT_MESSAGE_CONSUME_RETRY_WAIT_TIME);
|
|
- LOG.errorStream() << "Error writing consume transaction: " << data->getTxnId() << " error: " << exception.what()
|
|
|
|
- << " retrying in " << retrywait << " Microseconds";
|
|
|
|
|
|
+ LOG4CXX_ERROR(logger, "Error writing consume transaction: " << data->getTxnId() << " error: " << exception.what()
|
|
|
|
+ << " retrying in " << retrywait << " Microseconds");
|
|
|
|
|
|
boost::asio::deadline_timer t(client->getService(), boost::posix_time::milliseconds(retrywait));
|
|
boost::asio::deadline_timer t(client->getService(), boost::posix_time::milliseconds(retrywait));
|
|
|
|
|
|
@@ -104,9 +98,8 @@ SubscriberConsumeCallback::SubscriberConsumeCallback(const ClientImplPtr& client
|
|
}
|
|
}
|
|
|
|
|
|
void SubscriberConsumeCallback::operationComplete() {
|
|
void SubscriberConsumeCallback::operationComplete() {
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debugStream() << "ConsumeCallback::operationComplete " << data->getTopic() << " - " << data->getSubscriberId();
|
|
|
|
- };
|
|
|
|
|
|
+ LOG4CXX_DEBUG(logger, "ConsumeCallback::operationComplete " << data->getTopic() << " - " << data->getSubscriberId());
|
|
|
|
+
|
|
client->getSubscriber().consume(data->getTopic(), data->getSubscriberId(), m->message().msgid());
|
|
client->getSubscriber().consume(data->getTopic(), data->getSubscriberId(), m->message().msgid());
|
|
}
|
|
}
|
|
|
|
|
|
@@ -120,13 +113,13 @@ void SubscriberConsumeCallback::operationComplete() {
|
|
}
|
|
}
|
|
|
|
|
|
void SubscriberConsumeCallback::operationFailed(const std::exception& exception) {
|
|
void SubscriberConsumeCallback::operationFailed(const std::exception& exception) {
|
|
- LOG.errorStream() << "ConsumeCallback::operationFailed " << data->getTopic() << " - " << data->getSubscriberId();
|
|
|
|
|
|
+ LOG4CXX_ERROR(logger, "ConsumeCallback::operationFailed " << data->getTopic() << " - " << data->getSubscriberId());
|
|
|
|
|
|
int retrywait = client->getConfiguration().getInt(Configuration::SUBSCRIBER_CONSUME_RETRY_WAIT_TIME,
|
|
int retrywait = client->getConfiguration().getInt(Configuration::SUBSCRIBER_CONSUME_RETRY_WAIT_TIME,
|
|
DEFAULT_SUBSCRIBER_CONSUME_RETRY_WAIT_TIME);
|
|
DEFAULT_SUBSCRIBER_CONSUME_RETRY_WAIT_TIME);
|
|
|
|
|
|
- LOG.errorStream() << "Error passing message to client transaction: " << data->getTxnId() << " error: " << exception.what()
|
|
|
|
- << " retrying in " << retrywait << " Microseconds";
|
|
|
|
|
|
+ LOG4CXX_ERROR(logger, "Error passing message to client transaction: " << data->getTxnId() << " error: " << exception.what()
|
|
|
|
+ << " retrying in " << retrywait << " Microseconds");
|
|
|
|
|
|
boost::asio::deadline_timer t(client->getService(), boost::posix_time::milliseconds(retrywait));
|
|
boost::asio::deadline_timer t(client->getService(), boost::posix_time::milliseconds(retrywait));
|
|
|
|
|
|
@@ -141,29 +134,22 @@ void SubscriberReconnectCallback::operationComplete() {
|
|
}
|
|
}
|
|
|
|
|
|
void SubscriberReconnectCallback::operationFailed(const std::exception& exception) {
|
|
void SubscriberReconnectCallback::operationFailed(const std::exception& exception) {
|
|
- LOG.errorStream() << "Error writing to new subscriber. Channel should pick this up disconnect the channel and try to connect again " << exception.what();
|
|
|
|
-
|
|
|
|
|
|
+ LOG4CXX_ERROR(logger, "Error writing to new subscriber. Channel should pick this up disconnect the channel and try to connect again " << exception.what());
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
SubscriberClientChannelHandler::SubscriberClientChannelHandler(const ClientImplPtr& client, SubscriberImpl& subscriber, const PubSubDataPtr& data)
|
|
SubscriberClientChannelHandler::SubscriberClientChannelHandler(const ClientImplPtr& client, SubscriberImpl& subscriber, const PubSubDataPtr& data)
|
|
: HedwigClientChannelHandler(client), subscriber(subscriber), origData(data), closed(false), should_wait(true) {
|
|
: HedwigClientChannelHandler(client), subscriber(subscriber), origData(data), closed(false), should_wait(true) {
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debugStream() << "Creating SubscriberClientChannelHandler " << this;
|
|
|
|
- }
|
|
|
|
|
|
+ LOG4CXX_DEBUG(logger, "Creating SubscriberClientChannelHandler " << this);
|
|
}
|
|
}
|
|
|
|
|
|
SubscriberClientChannelHandler::~SubscriberClientChannelHandler() {
|
|
SubscriberClientChannelHandler::~SubscriberClientChannelHandler() {
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debugStream() << "Cleaning up SubscriberClientChannelHandler " << this;
|
|
|
|
- }
|
|
|
|
|
|
+ LOG4CXX_DEBUG(logger, "Cleaning up SubscriberClientChannelHandler " << this);
|
|
}
|
|
}
|
|
|
|
|
|
void SubscriberClientChannelHandler::messageReceived(const DuplexChannelPtr& channel, const PubSubResponsePtr& m) {
|
|
void SubscriberClientChannelHandler::messageReceived(const DuplexChannelPtr& channel, const PubSubResponsePtr& m) {
|
|
if (m->has_message()) {
|
|
if (m->has_message()) {
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debugStream() << "Message received (topic:" << origData->getTopic() << ", subscriberId:" << origData->getSubscriberId() << ")";
|
|
|
|
- }
|
|
|
|
|
|
+ LOG4CXX_DEBUG(logger, "Message received (topic:" << origData->getTopic() << ", subscriberId:" << origData->getSubscriberId() << ")");
|
|
|
|
|
|
if (this->handler.get()) {
|
|
if (this->handler.get()) {
|
|
OperationCallbackPtr callback(new SubscriberConsumeCallback(client, shared_from_this(), origData, m));
|
|
OperationCallbackPtr callback(new SubscriberConsumeCallback(client, shared_from_this(), origData, m));
|
|
@@ -263,7 +249,7 @@ void SubscriberClientChannelHandler::stopDelivery() {
|
|
|
|
|
|
|
|
|
|
void SubscriberClientChannelHandler::handoverDelivery(const SubscriberClientChannelHandlerPtr& newHandler) {
|
|
void SubscriberClientChannelHandler::handoverDelivery(const SubscriberClientChannelHandlerPtr& newHandler) {
|
|
- LOG.debugStream() << "Messages in queue " << queue.size();
|
|
|
|
|
|
+ LOG4CXX_DEBUG(logger, "Messages in queue " << queue.size());
|
|
MessageHandlerCallbackPtr handler = this->handler;
|
|
MessageHandlerCallbackPtr handler = this->handler;
|
|
stopDelivery(); // resets old handler
|
|
stopDelivery(); // resets old handler
|
|
newHandler->startDelivery(handler);
|
|
newHandler->startDelivery(handler);
|
|
@@ -284,7 +270,7 @@ SubscriberImpl::SubscriberImpl(const ClientImplPtr& client)
|
|
|
|
|
|
SubscriberImpl::~SubscriberImpl()
|
|
SubscriberImpl::~SubscriberImpl()
|
|
{
|
|
{
|
|
- LOG.debugStream() << "deleting subscriber" << this;
|
|
|
|
|
|
+ LOG4CXX_DEBUG(logger, "deleting subscriber" << this);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -322,9 +308,8 @@ void SubscriberImpl::doSubscribe(const DuplexChannelPtr& channel, const PubSubDa
|
|
oldhandler->handoverDelivery(handler);
|
|
oldhandler->handoverDelivery(handler);
|
|
}
|
|
}
|
|
topicsubscriber2handler[t] = handler;
|
|
topicsubscriber2handler[t] = handler;
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debugStream() << "Set topic subscriber for topic(" << data->getTopic() << ") subscriberId(" << data->getSubscriberId() << ") to " << handler.get() << " topicsubscriber2topic(" << &topicsubscriber2handler << ")";
|
|
|
|
- }
|
|
|
|
|
|
+
|
|
|
|
+ LOG4CXX_DEBUG(logger, "Set topic subscriber for topic(" << data->getTopic() << ") subscriberId(" << data->getSubscriberId() << ") to " << handler.get() << " topicsubscriber2topic(" << &topicsubscriber2handler << ")");
|
|
}
|
|
}
|
|
|
|
|
|
void SubscriberImpl::unsubscribe(const std::string& topic, const std::string& subscriberId) {
|
|
void SubscriberImpl::unsubscribe(const std::string& topic, const std::string& subscriberId) {
|
|
@@ -359,13 +344,13 @@ void SubscriberImpl::consume(const std::string& topic, const std::string& subscr
|
|
SubscriberClientChannelHandlerPtr handler = topicsubscriber2handler[t];
|
|
SubscriberClientChannelHandlerPtr handler = topicsubscriber2handler[t];
|
|
|
|
|
|
if (handler.get() == 0) {
|
|
if (handler.get() == 0) {
|
|
- LOG.errorStream() << "Cannot consume. Bad handler for topic(" << topic << ") subscriberId(" << subscriberId << ") topicsubscriber2topic(" << &topicsubscriber2handler << ")";
|
|
|
|
|
|
+ LOG4CXX_ERROR(logger, "Cannot consume. Bad handler for topic(" << topic << ") subscriberId(" << subscriberId << ") topicsubscriber2topic(" << &topicsubscriber2handler << ")");
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
DuplexChannelPtr channel = handler->getChannel();
|
|
DuplexChannelPtr channel = handler->getChannel();
|
|
if (channel.get() == 0) {
|
|
if (channel.get() == 0) {
|
|
- LOG.errorStream() << "Trying to consume a message on a topic/subscriber pair that don't have a channel. Something fishy going on. Topic: " << topic << " SubscriberId: " << subscriberId << " MessageSeqId: " << messageSeqId.localcomponent();
|
|
|
|
|
|
+ LOG4CXX_ERROR(logger, "Trying to consume a message on a topic/subscriber pair that don't have a channel. Something fishy going on. Topic: " << topic << " SubscriberId: " << subscriberId << " MessageSeqId: " << messageSeqId.localcomponent());
|
|
}
|
|
}
|
|
|
|
|
|
PubSubDataPtr data = PubSubData::forConsumeRequest(client->counter().next(), subscriberId, topic, messageSeqId);
|
|
PubSubDataPtr data = PubSubData::forConsumeRequest(client->counter().next(), subscriberId, topic, messageSeqId);
|
|
@@ -380,7 +365,7 @@ void SubscriberImpl::startDelivery(const std::string& topic, const std::string&
|
|
SubscriberClientChannelHandlerPtr handler = topicsubscriber2handler[t];
|
|
SubscriberClientChannelHandlerPtr handler = topicsubscriber2handler[t];
|
|
|
|
|
|
if (handler.get() == 0) {
|
|
if (handler.get() == 0) {
|
|
- LOG.errorStream() << "Trying to start deliver on a non existant handler topic = " << topic << ", subscriber = " << subscriberId;
|
|
|
|
|
|
+ LOG4CXX_ERROR(logger, "Trying to start deliver on a non existant handler topic = " << topic << ", subscriber = " << subscriberId);
|
|
}
|
|
}
|
|
handler->startDelivery(callback);
|
|
handler->startDelivery(callback);
|
|
}
|
|
}
|
|
@@ -392,15 +377,14 @@ void SubscriberImpl::stopDelivery(const std::string& topic, const std::string& s
|
|
SubscriberClientChannelHandlerPtr handler = topicsubscriber2handler[t];
|
|
SubscriberClientChannelHandlerPtr handler = topicsubscriber2handler[t];
|
|
|
|
|
|
if (handler.get() == 0) {
|
|
if (handler.get() == 0) {
|
|
- LOG.errorStream() << "Trying to start deliver on a non existant handler topic = " << topic << ", subscriber = " << subscriberId;
|
|
|
|
|
|
+ LOG4CXX_ERROR(logger, "Trying to start deliver on a non existant handler topic = " << topic << ", subscriber = " << subscriberId);
|
|
}
|
|
}
|
|
handler->stopDelivery();
|
|
handler->stopDelivery();
|
|
}
|
|
}
|
|
|
|
|
|
void SubscriberImpl::closeSubscription(const std::string& topic, const std::string& subscriberId) {
|
|
void SubscriberImpl::closeSubscription(const std::string& topic, const std::string& subscriberId) {
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debugStream() << "closeSubscription (" << topic << ", " << subscriberId << ")";
|
|
|
|
- }
|
|
|
|
|
|
+ LOG4CXX_DEBUG(logger, "closeSubscription (" << topic << ", " << subscriberId << ")");
|
|
|
|
+
|
|
TopicSubscriber t(topic, subscriberId);
|
|
TopicSubscriber t(topic, subscriberId);
|
|
|
|
|
|
SubscriberClientChannelHandlerPtr handler;
|
|
SubscriberClientChannelHandlerPtr handler;
|
|
@@ -420,13 +404,12 @@ void SubscriberImpl::closeSubscription(const std::string& topic, const std::stri
|
|
*/
|
|
*/
|
|
void SubscriberImpl::messageHandler(const PubSubResponsePtr& m, const PubSubDataPtr& txn) {
|
|
void SubscriberImpl::messageHandler(const PubSubResponsePtr& m, const PubSubDataPtr& txn) {
|
|
if (!txn.get()) {
|
|
if (!txn.get()) {
|
|
- LOG.errorStream() << "Invalid transaction";
|
|
|
|
|
|
+ LOG4CXX_ERROR(logger, "Invalid transaction");
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debugStream() << "message received with status " << m->statuscode();
|
|
|
|
- }
|
|
|
|
|
|
+ LOG4CXX_DEBUG(logger, "message received with status " << m->statuscode());
|
|
|
|
+
|
|
switch (m->statuscode()) {
|
|
switch (m->statuscode()) {
|
|
case SUCCESS:
|
|
case SUCCESS:
|
|
txn->getCallback()->operationComplete();
|
|
txn->getCallback()->operationComplete();
|