Browse Source

ZOOKEEPER-1400. Allow logging via callback instead of raw FILE pointer (Marshall McMullen via michim)

git-svn-id: https://svn.apache.org/repos/asf/zookeeper/trunk@1484357 13f79535-47bb-0310-9956-ffa450edef68
Michi Mutsuzaki 12 years ago
parent
commit
37973fab29

+ 4 - 1
CHANGES.txt

@@ -12,7 +12,10 @@ NEW FEATURES:
   ZOOKEEPER-1572. Add an async (Java) interface for multi request (Sijie Guo via camille)
   ZOOKEEPER-1572. Add an async (Java) interface for multi request (Sijie Guo via camille)
 
 
   ZOOKEEPER-107. Allow dynamic changes to server cluster membership (Alex Shraer via breed)
   ZOOKEEPER-107. Allow dynamic changes to server cluster membership (Alex Shraer via breed)
-  
+
+  ZOOKEEPER-1400. Allow logging via callback instead of raw FILE pointer
+  (Marshall McMullen via michim)
+
 BUGFIXES:
 BUGFIXES:
 
 
   ZOOKEEPER-786. Exception in ZooKeeper.toString
   ZOOKEEPER-786. Exception in ZooKeeper.toString

+ 73 - 0
src/c/include/zookeeper.h

@@ -426,6 +426,14 @@ typedef struct zoo_op_result {
 typedef void (*watcher_fn)(zhandle_t *zh, int type,
 typedef void (*watcher_fn)(zhandle_t *zh, int type,
         int state, const char *path,void *watcherCtx);
         int state, const char *path,void *watcherCtx);
 
 
+/**
+ * \brief typedef for setting the log callback. It's a function pointer which
+ * returns void and accepts a const char* as its only argument.
+ *
+ * \param message message to be passed to the callback function.
+ */
+typedef void (*log_callback_fn)(const char *message);
+
 /**
 /**
  * \brief create a handle to used communicate with zookeeper.
  * \brief create a handle to used communicate with zookeeper.
  *
  *
@@ -457,6 +465,45 @@ typedef void (*watcher_fn)(zhandle_t *zh, int type,
 ZOOAPI zhandle_t *zookeeper_init(const char *host, watcher_fn fn,
 ZOOAPI zhandle_t *zookeeper_init(const char *host, watcher_fn fn,
   int recv_timeout, const clientid_t *clientid, void *context, int flags);
   int recv_timeout, const clientid_t *clientid, void *context, int flags);
 
 
+/**
+ * \brief create a handle to communicate with zookeeper.
+ *
+ * This function is identical to \ref zookeeper_init except it allows one
+ * to specify an additional callback to be used for all logging for that
+ * specific connection. For more details on the logging callback see
+ * \ref zoo_get_log_callback and \ref zoo_set_log_callback.
+ *
+ * This method creates a new handle and a zookeeper session that corresponds
+ * to that handle. Session establishment is asynchronous, meaning that the
+ * session should not be considered established until (and unless) an
+ * event of state ZOO_CONNECTED_STATE is received.
+ * \param host comma separated host:port pairs, each corresponding to a zk
+ *   server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+ * \param fn the global watcher callback function. When notifications are
+ *   triggered this function will be invoked.
+ * \param clientid the id of a previously established session that this
+ *   client will be reconnecting to. Pass 0 if not reconnecting to a previous
+ *   session. Clients can access the session id of an established, valid,
+ *   connection by calling \ref zoo_client_id. If the session corresponding to
+ *   the specified clientid has expired, or if the clientid is invalid for
+ *   any reason, the returned zhandle_t will be invalid -- the zhandle_t
+ *   state will indicate the reason for failure (typically
+ *   ZOO_EXPIRED_SESSION_STATE).
+ * \param context the handback object that will be associated with this instance
+ *   of zhandle_t. Application can access it (for example, in the watcher
+ *   callback) using \ref zoo_get_context. The object is not used by zookeeper
+ *   internally and can be null.
+ * \param flags reserved for future use. Should be set to zero.
+ * \param log_callback All log messages will be passed to this callback function.
+ *   For more details see \ref zoo_get_log_callback and \ref zoo_set_log_callback.
+ * \return a pointer to the opaque zhandle structure. If it fails to create
+ * a new zhandle the function returns NULL and the errno variable
+ * indicates the reason.
+ */
+ZOOAPI zhandle_t *zookeeper_init2(const char *host, watcher_fn fn,
+  int recv_timeout, const clientid_t *clientid, void *context, int flags,
+  log_callback_fn log_callback);
+
 /**
 /**
  * \brief update the list of servers this client will connect to.
  * \brief update the list of servers this client will connect to.
  *
  *
@@ -1407,6 +1454,32 @@ ZOOAPI void zoo_set_debug_level(ZooLogLevel logLevel);
  */
  */
 ZOOAPI void zoo_set_log_stream(FILE* logStream);
 ZOOAPI void zoo_set_log_stream(FILE* logStream);
 
 
+/**
+ * \brief gets the callback to be used by this connection for logging.
+ *
+ * This is a per-connection logging mechanism that will take priority over
+ * the library-wide default log stream. That is, zookeeper library will first
+ * try to use a per-connection callback if available and if not, will fallback
+ * to using the logging stream. Passing in NULL resets the callback and will
+ * cause it to then fallback to using the logging stream as described in \ref
+ * zoo_set_log_stream.
+ */
+ZOOAPI log_callback_fn zoo_get_log_callback(const zhandle_t *zh);
+
+/**
+ * \brief sets the callback to be used by the library for logging
+ *
+ * Setting this callback has the effect of overriding the default log stream.
+ * Zookeeper will first try to use a per-connection callback if available
+ * and if not, will fallback to using the logging stream. Passing in NULL
+ * resets the callback and will cause it to then fallback to using the logging
+ * stream as described in \ref zoo_set_log_stream.
+ *
+ * Note: The provided callback will be invoked by multiple threads and therefore
+ * it needs to be thread-safe.
+ */
+ZOOAPI void zoo_set_log_callback(zhandle_t *zh, log_callback_fn callback);
+
 /**
 /**
  * \brief enable/disable quorum endpoint order randomization
  * \brief enable/disable quorum endpoint order randomization
  *
  *

+ 16 - 17
src/c/include/zookeeper_log.h

@@ -26,23 +26,22 @@ extern "C" {
 #endif
 #endif
 
 
 extern ZOOAPI ZooLogLevel logLevel;
 extern ZOOAPI ZooLogLevel logLevel;
-#define LOGSTREAM getLogStream()
-
-#define LOG_ERROR(x) if(logLevel>=ZOO_LOG_LEVEL_ERROR) \
-    log_message(ZOO_LOG_LEVEL_ERROR,__LINE__,__func__,format_log_message x)
-#define LOG_WARN(x) if(logLevel>=ZOO_LOG_LEVEL_WARN) \
-    log_message(ZOO_LOG_LEVEL_WARN,__LINE__,__func__,format_log_message x)
-#define LOG_INFO(x) if(logLevel>=ZOO_LOG_LEVEL_INFO) \
-    log_message(ZOO_LOG_LEVEL_INFO,__LINE__,__func__,format_log_message x)
-#define LOG_DEBUG(x) if(logLevel==ZOO_LOG_LEVEL_DEBUG) \
-    log_message(ZOO_LOG_LEVEL_DEBUG,__LINE__,__func__,format_log_message x)
-
-ZOOAPI void log_message(ZooLogLevel curLevel, int line,const char* funcName,
-    const char* message);
-
-ZOOAPI const char* format_log_message(const char* format,...);
-
-FILE* getLogStream();
+#define LOGCALLBACK(_zh) zoo_get_log_callback(_zh)
+#define LOGSTREAM NULL
+
+#define LOG_ERROR(_cb, ...) if(logLevel>=ZOO_LOG_LEVEL_ERROR) \
+    log_message(_cb, ZOO_LOG_LEVEL_ERROR, __LINE__, __func__, __VA_ARGS__)
+#define LOG_WARN(_cb, ...) if(logLevel>=ZOO_LOG_LEVEL_WARN) \
+    log_message(_cb, ZOO_LOG_LEVEL_WARN, __LINE__, __func__, __VA_ARGS__)
+#define LOG_INFO(_cb, ...) if(logLevel>=ZOO_LOG_LEVEL_INFO) \
+    log_message(_cb, ZOO_LOG_LEVEL_INFO, __LINE__, __func__, __VA_ARGS__)
+#define LOG_DEBUG(_cb, ...) if(logLevel==ZOO_LOG_LEVEL_DEBUG) \
+    log_message(_cb, ZOO_LOG_LEVEL_DEBUG, __LINE__, __func__, __VA_ARGS__)
+
+ZOOAPI void log_message(log_callback_fn callback, ZooLogLevel curLevel,
+    int line, const char* funcName, const char* format, ...);
+
+FILE* zoo_get_log_stream();
 
 
 #ifdef __cplusplus
 #ifdef __cplusplus
 }
 }

+ 15 - 15
src/c/src/load_gen.c

@@ -88,7 +88,7 @@ void listener(zhandle_t *zzh, int type, int state, const char *path,void* ctx) {
 void create_completion(int rc, const char *name, const void *data) {
 void create_completion(int rc, const char *name, const void *data) {
     incCounter(-1);
     incCounter(-1);
     if(rc!=ZOK){
     if(rc!=ZOK){
-        LOG_ERROR(("Failed to create a node rc=%d",rc));
+        LOG_ERROR(LOGSTREAM, "Failed to create a node rc=%d",rc);
     }
     }
 }
 }
 
 
@@ -102,7 +102,7 @@ int doCreateNodes(const char* root, int count){
         rc=zoo_acreate(zh, nodeName, "first", 5, &ZOO_OPEN_ACL_UNSAFE, 0,
         rc=zoo_acreate(zh, nodeName, "first", 5, &ZOO_OPEN_ACL_UNSAFE, 0,
                             create_completion, 0);
                             create_completion, 0);
         if(i%1000==0){
         if(i%1000==0){
-            LOG_INFO(("Created %s",nodeName));
+            LOG_INFO(LOGSTREAM, "Created %s", nodeName);
         }
         }
         if(rc!=ZOK) return rc;        
         if(rc!=ZOK) return rc;        
     }
     }
@@ -116,7 +116,7 @@ int createRoot(const char* root){
 void write_completion(int rc, const struct Stat *stat, const void *data) {
 void write_completion(int rc, const struct Stat *stat, const void *data) {
     incCounter(-1);
     incCounter(-1);
     if(rc!=ZOK){
     if(rc!=ZOK){
-        LOG_ERROR(("Failed to write a node rc=%d",rc));
+        LOG_ERROR(LOGSTREAM, "Failed to write a node rc=%d",rc);
     }
     }
 }
 }
 
 
@@ -137,13 +137,13 @@ void read_completion(int rc, const char *value, int value_len,
         const struct Stat *stat, const void *data) {
         const struct Stat *stat, const void *data) {
     incCounter(-1);    
     incCounter(-1);    
     if(rc!=ZOK){
     if(rc!=ZOK){
-        LOG_ERROR(("Failed to read a node rc=%d",rc));
+        LOG_ERROR(LOGSTREAM, "Failed to read a node rc=%d",rc);
         return;
         return;
     }
     }
     if(memcmp(value,"second",6)!=0){
     if(memcmp(value,"second",6)!=0){
         char buf[value_len+1];
         char buf[value_len+1];
         memcpy(buf,value,value_len);buf[value_len]=0;
         memcpy(buf,value,value_len);buf[value_len]=0;
-        LOG_ERROR(("Invalid read, expected [second], received [%s]\n",buf));
+        LOG_ERROR(LOGSTREAM, "Invalid read, expected [second], received [%s]\n",buf);
         exit(1);
         exit(1);
     }
     }
 }
 }
@@ -198,7 +198,7 @@ int recursiveDelete(const char* root){
     int rc=zoo_get_children(zh,root,0,&children);
     int rc=zoo_get_children(zh,root,0,&children);
     if(rc!=ZNONODE){
     if(rc!=ZNONODE){
         if(rc!=ZOK){
         if(rc!=ZOK){
-            LOG_ERROR(("Failed to get children of %s, rc=%d",root,rc));
+            LOG_ERROR(LOGSTREAM, "Failed to get children of %s, rc=%d",root,rc);
             return rc;
             return rc;
         }
         }
         for(i=0;i<children.count; i++){
         for(i=0;i<children.count; i++){
@@ -214,10 +214,10 @@ int recursiveDelete(const char* root){
         free_String_vector(&children);
         free_String_vector(&children);
     }
     }
     if(deletedCounter%1000==0)
     if(deletedCounter%1000==0)
-        LOG_INFO(("Deleting %s",root));
+        LOG_INFO(LOGSTREAM, "Deleting %s",root);
     rc=zoo_delete(zh,root,-1);
     rc=zoo_delete(zh,root,-1);
     if(rc!=ZOK){
     if(rc!=ZOK){
-        LOG_ERROR(("Failed to delete znode %s, rc=%d",root,rc));
+        LOG_ERROR(LOGSTREAM, "Failed to delete znode %s, rc=%d",root,rc);
     }else
     }else
         deletedCounter++;
         deletedCounter++;
     return rc;
     return rc;
@@ -245,15 +245,15 @@ int main(int argc, char **argv) {
     if (!zh)
     if (!zh)
         return errno;
         return errno;
 
 
-    LOG_INFO(("Checking server connection..."));
+    LOG_INFO(LOGSTREAM, "Checking server connection...");
     ensureConnected();
     ensureConnected();
     if(cleaning==1){
     if(cleaning==1){
         int rc = 0;
         int rc = 0;
         deletedCounter=0;
         deletedCounter=0;
         rc=recursiveDelete(argv[2]);
         rc=recursiveDelete(argv[2]);
         if(rc==ZOK){
         if(rc==ZOK){
-            LOG_INFO(("Succesfully deleted a subtree starting at %s (%d nodes)",
-                    argv[2],deletedCounter));
+            LOG_INFO(LOGSTREAM, "Succesfully deleted a subtree starting at %s (%d nodes)",
+                    argv[2],deletedCounter);
             exit(0);
             exit(0);
         }
         }
         exit(1);
         exit(1);
@@ -262,18 +262,18 @@ int main(int argc, char **argv) {
     createRoot(argv[2]);
     createRoot(argv[2]);
     while(1) {
     while(1) {
         ensureConnected();
         ensureConnected();
-        LOG_INFO(("Creating children for path %s",argv[2]));
+        LOG_INFO(LOGSTREAM, "Creating children for path %s",argv[2]);
         doCreateNodes(argv[2],nodeCount);
         doCreateNodes(argv[2],nodeCount);
         waitCounter();
         waitCounter();
         
         
-        LOG_INFO(("Starting the write cycle for path %s",argv[2]));
+        LOG_INFO(LOGSTREAM, "Starting the write cycle for path %s",argv[2]);
         doWrites(argv[2],nodeCount);
         doWrites(argv[2],nodeCount);
         waitCounter();
         waitCounter();
-        LOG_INFO(("Starting the read cycle for path %s",argv[2]));
+        LOG_INFO(LOGSTREAM, "Starting the read cycle for path %s",argv[2]);
         doReads(argv[2],nodeCount);
         doReads(argv[2],nodeCount);
         waitCounter();
         waitCounter();
 
 
-        LOG_INFO(("Starting the delete cycle for path %s",argv[2]));
+        LOG_INFO(LOGSTREAM, "Starting the delete cycle for path %s",argv[2]);
         doDeletes(argv[2],nodeCount);
         doDeletes(argv[2],nodeCount);
         waitCounter();
         waitCounter();
     }
     }

+ 11 - 11
src/c/src/mt_adaptor.c

@@ -116,7 +116,7 @@ unsigned __stdcall do_completion( void * );
 
 
 int handle_error(SOCKET sock, char* message)
 int handle_error(SOCKET sock, char* message)
 {
 {
-       LOG_ERROR(("%s. %d",message, WSAGetLastError()));
+       LOG_ERROR(LOGCALLBACK(zh), "%s. %d",message, WSAGetLastError());
        closesocket (sock);
        closesocket (sock);
        return -1;
        return -1;
 }
 }
@@ -131,7 +131,7 @@ int create_socket_pair(SOCKET fds[2])
        
        
     SOCKET lst=socket(AF_INET, SOCK_STREAM,IPPROTO_TCP); 
     SOCKET lst=socket(AF_INET, SOCK_STREAM,IPPROTO_TCP); 
     if (lst ==  INVALID_SOCKET ){
     if (lst ==  INVALID_SOCKET ){
-       LOG_ERROR(("Error creating socket. %d",WSAGetLastError()));
+       LOG_ERROR(LOGCALLBACK(zh), "Error creating socket. %d",WSAGetLastError());
        return -1;
        return -1;
     }
     }
     memset(&inaddr, 0, sizeof(inaddr)); 
     memset(&inaddr, 0, sizeof(inaddr)); 
@@ -218,7 +218,7 @@ void start_threads(zhandle_t* zh)
     // use api_prolog() to make sure zhandle doesn't get destroyed
     // use api_prolog() to make sure zhandle doesn't get destroyed
     // while initialization is in progress
     // while initialization is in progress
     api_prolog(zh);
     api_prolog(zh);
-    LOG_DEBUG(("starting threads..."));
+    LOG_DEBUG(LOGCALLBACK(zh), "starting threads...");
     rc=pthread_create(&adaptor->io, 0, do_io, zh);
     rc=pthread_create(&adaptor->io, 0, do_io, zh);
     assert("pthread_create() failed for the IO thread"&&!rc);
     assert("pthread_create() failed for the IO thread"&&!rc);
     rc=pthread_create(&adaptor->completion, 0, do_completion, zh);
     rc=pthread_create(&adaptor->completion, 0, do_completion, zh);
@@ -232,17 +232,17 @@ int adaptor_init(zhandle_t *zh)
     pthread_mutexattr_t recursive_mx_attr;
     pthread_mutexattr_t recursive_mx_attr;
     struct adaptor_threads *adaptor_threads = calloc(1, sizeof(*adaptor_threads));
     struct adaptor_threads *adaptor_threads = calloc(1, sizeof(*adaptor_threads));
     if (!adaptor_threads) {
     if (!adaptor_threads) {
-        LOG_ERROR(("Out of memory"));
+        LOG_ERROR(LOGCALLBACK(zh), "Out of memory");
         return -1;
         return -1;
     }
     }
 
 
     /* We use a pipe for interrupting select() in unix/sol and socketpair in windows. */
     /* We use a pipe for interrupting select() in unix/sol and socketpair in windows. */
 #ifdef WIN32   
 #ifdef WIN32   
     if (create_socket_pair(adaptor_threads->self_pipe) == -1){
     if (create_socket_pair(adaptor_threads->self_pipe) == -1){
-       LOG_ERROR(("Can't make a socket."));
+       LOG_ERROR(LOGCALLBACK(zh), "Can't make a socket.");
 #else
 #else
     if(pipe(adaptor_threads->self_pipe)==-1) {
     if(pipe(adaptor_threads->self_pipe)==-1) {
-        LOG_ERROR(("Can't make a pipe %d",errno));
+        LOG_ERROR(LOGCALLBACK(zh), "Can't make a pipe %d",errno);
 #endif
 #endif
         free(adaptor_threads);
         free(adaptor_threads);
         return -1;
         return -1;
@@ -365,7 +365,7 @@ void *do_io(void *v)
 
 
     api_prolog(zh);
     api_prolog(zh);
     notify_thread_ready(zh);
     notify_thread_ready(zh);
-    LOG_DEBUG(("started IO thread"));
+    LOG_DEBUG(LOGCALLBACK(zh), "started IO thread");
     fds[0].fd=adaptor_threads->self_pipe[0];
     fds[0].fd=adaptor_threads->self_pipe[0];
     fds[0].events=POLLIN;
     fds[0].events=POLLIN;
     while(!zh->close_requested) {
     while(!zh->close_requested) {
@@ -400,7 +400,7 @@ void *do_io(void *v)
     struct adaptor_threads *adaptor_threads = zh->adaptor_priv;
     struct adaptor_threads *adaptor_threads = zh->adaptor_priv;
     api_prolog(zh);
     api_prolog(zh);
     notify_thread_ready(zh);
     notify_thread_ready(zh);
-    LOG_DEBUG(("started IO thread"));
+    LOG_DEBUG(LOGCALLBACK(zh), "started IO thread");
     FD_ZERO(&rfds);   FD_ZERO(&wfds);    FD_ZERO(&efds);
     FD_ZERO(&rfds);   FD_ZERO(&wfds);    FD_ZERO(&efds);
     while(!zh->close_requested) {      
     while(!zh->close_requested) {      
         struct timeval tv;
         struct timeval tv;
@@ -444,7 +444,7 @@ void *do_io(void *v)
             break;
             break;
     }
     }
     api_epilog(zh, 0);    
     api_epilog(zh, 0);    
-    LOG_DEBUG(("IO thread terminated"));
+    LOG_DEBUG(LOGCALLBACK(zh), "IO thread terminated");
     return 0;
     return 0;
 }
 }
 
 
@@ -457,7 +457,7 @@ void *do_completion(void *v)
     zhandle_t *zh = v;
     zhandle_t *zh = v;
     api_prolog(zh);
     api_prolog(zh);
     notify_thread_ready(zh);
     notify_thread_ready(zh);
-    LOG_DEBUG(("started completion thread"));
+    LOG_DEBUG(LOGCALLBACK(zh), "started completion thread");
     while(!zh->close_requested) {
     while(!zh->close_requested) {
         pthread_mutex_lock(&zh->completions_to_process.lock);
         pthread_mutex_lock(&zh->completions_to_process.lock);
         while(!zh->completions_to_process.head && !zh->close_requested) {
         while(!zh->completions_to_process.head && !zh->close_requested) {
@@ -467,7 +467,7 @@ void *do_completion(void *v)
         process_completions(zh);
         process_completions(zh);
     }
     }
     api_epilog(zh, 0);    
     api_epilog(zh, 0);    
-    LOG_DEBUG(("completion thread terminated"));
+    LOG_DEBUG(LOGCALLBACK(zh), "completion thread terminated");
     return 0;
     return 0;
 }
 }
 
 

+ 1 - 0
src/c/src/zk_adaptor.h

@@ -223,6 +223,7 @@ struct _zhandle {
     clientid_t client_id;               // client-id
     clientid_t client_id;               // client-id
     long long last_zxid;                // last zookeeper ID
     long long last_zxid;                // last zookeeper ID
     auth_list_head_t auth_h;            // authentication data list
     auth_list_head_t auth_h;            // authentication data list
+    log_callback_fn log_callback;       // Callback for logging (falls back to logging to stderr)  
 
 
     // Primer storage
     // Primer storage
     struct _buffer_list primer_buffer;  // The buffer used for the handshake at the start of a connection
     struct _buffer_list primer_buffer;  // The buffer used for the handshake at the start of a connection

+ 47 - 27
src/c/src/zk_log.c

@@ -86,7 +86,7 @@ char* get_format_log_buffer(){
 ZooLogLevel logLevel=ZOO_LOG_LEVEL_INFO;
 ZooLogLevel logLevel=ZOO_LOG_LEVEL_INFO;
 
 
 static FILE* logStream=0;
 static FILE* logStream=0;
-FILE* getLogStream(){
+FILE* zoo_get_log_stream(){
     if(logStream==0)
     if(logStream==0)
         logStream=stderr;
         logStream=stderr;
     return logStream;
     return logStream;
@@ -122,44 +122,64 @@ static const char* time_now(char* now_str){
     return now_str;
     return now_str;
 }
 }
 
 
-void log_message(ZooLogLevel curLevel,int line,const char* funcName,
-    const char* message)
+void log_message(log_callback_fn callback, ZooLogLevel curLevel,
+    int line, const char* funcName, const char* format, ...)
 {
 {
     static const char* dbgLevelStr[]={"ZOO_INVALID","ZOO_ERROR","ZOO_WARN",
     static const char* dbgLevelStr[]={"ZOO_INVALID","ZOO_ERROR","ZOO_WARN",
             "ZOO_INFO","ZOO_DEBUG"};
             "ZOO_INFO","ZOO_DEBUG"};
+
+    char* buf = get_format_log_buffer();
+    if(!buf)
+    {
+        fprintf(stderr, "log_message: Unable to allocate memory buffer");
+        return;
+    }
+
     static pid_t pid=0;
     static pid_t pid=0;
+
+    if(pid==0)
+    {
+        pid=getpid();
+    }
+
 #ifdef WIN32
 #ifdef WIN32
     char timebuf [TIME_NOW_BUF_SIZE];
     char timebuf [TIME_NOW_BUF_SIZE];
+    const char* time = time_now(timebuf);
+#else
+    const char* time = time_now(get_time_buffer());
 #endif
 #endif
-    if(pid==0)pid=getpid();
+
 #ifndef THREADED
 #ifndef THREADED
-    fprintf(LOGSTREAM, "%s:%d:%s@%s@%d: %s\n", time_now(get_time_buffer()),pid,
-            dbgLevelStr[curLevel],funcName,line,message);
-#else
-#ifdef WIN32
-    fprintf(LOGSTREAM, "%s:%d(0x%lx):%s@%s@%d: %s\n", time_now(timebuf),pid,
-            (unsigned long int)(pthread_self().thread_id),
-            dbgLevelStr[curLevel],funcName,line,message);      
+
+    int ofs = snprintf(buf, FORMAT_LOG_BUF_SIZE,
+                       "%s:%d:%s@%s@%d: ", time, pid,
+                       dbgLevelStr[curLevel], funcName, line);
 #else
 #else
-    fprintf(LOGSTREAM, "%s:%d(0x%lx):%s@%s@%d: %s\n", time_now(get_time_buffer()),pid,
-            (unsigned long int)pthread_self(),
-            dbgLevelStr[curLevel],funcName,line,message);      
-#endif
+
+    #ifdef WIN32
+        unsigned long int tid = (unsigned long int)(pthread_self().thread_id);
+    #else
+        unsigned long int tid = (unsigned long int)(pthread_self());
+    #endif
+
+    int ofs = snprintf(buf, FORMAT_LOG_BUF_SIZE-1,
+                       "%s:%d(0x%lx):%s@%s@%d: ", time, pid, tid,
+                       dbgLevelStr[curLevel], funcName, line);
 #endif
 #endif
-    fflush(LOGSTREAM);
-}
 
 
-const char* format_log_message(const char* format,...)
-{
+    // Now grab the actual message out of the variadic arg list
     va_list va;
     va_list va;
-    char* buf=get_format_log_buffer();
-    if(!buf)
-        return "format_log_message: Unable to allocate memory buffer";
-    
-    va_start(va,format);
-    vsnprintf(buf, FORMAT_LOG_BUF_SIZE-1,format,va);
-    va_end(va); 
-    return buf;
+    va_start(va, format);
+    vsnprintf(buf+ofs, FORMAT_LOG_BUF_SIZE-1-ofs, format, va);
+    va_end(va);
+
+    if (callback)
+    {
+        callback(buf);
+    } else {
+        fprintf(zoo_get_log_stream(), "%s\n", buf);
+        fflush(zoo_get_log_stream());
+    }
 }
 }
 
 
 void zoo_set_debug_level(ZooLogLevel level)
 void zoo_set_debug_level(ZooLogLevel level)

File diff suppressed because it is too large
+ 184 - 166
src/c/src/zookeeper.c


+ 3 - 3
src/c/tests/PthreadMocks.h

@@ -381,18 +381,18 @@ public:
         int ret=LIBC_SYMBOLS.pthread_create(t,a,threadFuncWrapper,
         int ret=LIBC_SYMBOLS.pthread_create(t,a,threadFuncWrapper,
                 new ThreadContext(f,d));
                 new ThreadContext(f,d));
         if(verbose)
         if(verbose)
-            TEST_TRACE(("thread created %p",*t));
+            TEST_TRACE("thread created %p",*t);
         return ret;
         return ret;
     }
     }
     virtual int pthread_join(pthread_t t, void ** r){
     virtual int pthread_join(pthread_t t, void ** r){
-        if(verbose) TEST_TRACE(("thread joined %p",t));
+        if(verbose) TEST_TRACE("thread joined %p",t);
         int ret=LIBC_SYMBOLS.pthread_join(t,r);
         int ret=LIBC_SYMBOLS.pthread_join(t,r);
         if(ret==0)
         if(ret==0)
             markDestroyed(t);
             markDestroyed(t);
         return ret;
         return ret;
     }
     }
     virtual int pthread_detach(pthread_t t){
     virtual int pthread_detach(pthread_t t){
-        if(verbose) TEST_TRACE(("thread detached %p",t));
+        if(verbose) TEST_TRACE("thread detached %p",t);
         int ret=LIBC_SYMBOLS.pthread_detach(t);
         int ret=LIBC_SYMBOLS.pthread_detach(t);
         if(ret==0)
         if(ret==0)
             markDestroyed(t);
             markDestroyed(t);

+ 82 - 1
src/c/tests/TestClient.cc

@@ -46,6 +46,13 @@ struct buff_struct_2 {
     char *buffer;
     char *buffer;
 };
 };
 
 
+// For testing LogMessage Callback functionality
+list<string> logMessages;
+void logMessageHandler(const char* message) {
+    cout << "Log Message Received: [" << message << "]" << endl;
+    logMessages.push_back(message);
+}
+
 static int Stat_eq(struct Stat* a, struct Stat* b)
 static int Stat_eq(struct Stat* a, struct Stat* b)
 {
 {
     if (a->czxid != b->czxid) return 0;
     if (a->czxid != b->czxid) return 0;
@@ -172,6 +179,7 @@ public:
         }
         }
         return connected;
         return connected;
     }
     }
+
     bool waitForDisconnected(zhandle_t *zh) {
     bool waitForDisconnected(zhandle_t *zh) {
         time_t expires = time(0) + 15;
         time_t expires = time(0) + 15;
         while(connected && time(0) < expires) {
         while(connected && time(0) < expires) {
@@ -179,11 +187,15 @@ public:
         }
         }
         return !connected;
         return !connected;
     }
     }
+
 } watchctx_t;
 } watchctx_t;
 
 
 class Zookeeper_simpleSystem : public CPPUNIT_NS::TestFixture
 class Zookeeper_simpleSystem : public CPPUNIT_NS::TestFixture
 {
 {
     CPPUNIT_TEST_SUITE(Zookeeper_simpleSystem);
     CPPUNIT_TEST_SUITE(Zookeeper_simpleSystem);
+    CPPUNIT_TEST(testLogCallbackSet);
+    CPPUNIT_TEST(testLogCallbackInit);
+    CPPUNIT_TEST(testLogCallbackClear);
     CPPUNIT_TEST(testAsyncWatcherAutoReset);
     CPPUNIT_TEST(testAsyncWatcherAutoReset);
     CPPUNIT_TEST(testDeserializeString);
     CPPUNIT_TEST(testDeserializeString);
 #ifdef THREADED
 #ifdef THREADED
@@ -231,6 +243,13 @@ class Zookeeper_simpleSystem : public CPPUNIT_NS::TestFixture
         return createClient(hostPorts, ctx);
         return createClient(hostPorts, ctx);
     }
     }
 
 
+    zhandle_t *createClient(watchctx_t *ctx, log_callback_fn logCallback) {
+        zhandle_t *zk = zookeeper_init2(hostPorts, watcher, 10000, 0, ctx, 0, logCallback);
+        ctx->zh = zk;
+        sleep(1);
+        return zk;
+    }
+
     zhandle_t *createClient(const char *hp, watchctx_t *ctx) {
     zhandle_t *createClient(const char *hp, watchctx_t *ctx) {
         zhandle_t *zk = zookeeper_init(hp, watcher, 10000, 0, ctx, 0);
         zhandle_t *zk = zookeeper_init(hp, watcher, 10000, 0, ctx, 0);
         ctx->zh = zk;
         ctx->zh = zk;
@@ -265,7 +284,6 @@ public:
         zoo_set_log_stream(logfile);
         zoo_set_log_stream(logfile);
     }
     }
 
 
-
     void startServer() {
     void startServer() {
         char cmd[1024];
         char cmd[1024];
         sprintf(cmd, "%s start %s", ZKSERVER_CMD, getHostPorts());
         sprintf(cmd, "%s start %s", ZKSERVER_CMD, getHostPorts());
@@ -907,6 +925,69 @@ public:
         CPPUNIT_ASSERT_EQUAL(string(path), string(path_buffer));
         CPPUNIT_ASSERT_EQUAL(string(path), string(path_buffer));
     }
     }
 
 
+    // Test creating normal handle via zookeeper_init then explicitly setting callback
+    void testLogCallbackSet()
+    {
+        watchctx_t ctx;
+        CPPUNIT_ASSERT(logMessages.empty());
+        zhandle_t *zk = createClient(&ctx);
+
+        zoo_set_log_callback(zk, &logMessageHandler);
+        CPPUNIT_ASSERT_EQUAL(zoo_get_log_callback(zk), &logMessageHandler);
+
+        // Log 10 messages and ensure all go to callback
+        int expected = 10;
+        for (int i = 0; i < expected; i++)
+        {
+            LOG_INFO(LOGCALLBACK(zk), "%s #%d", __FUNCTION__, i);
+        }
+        CPPUNIT_ASSERT(expected == logMessages.size());
+    }
+
+    // Test creating handle via zookeeper_init2 to ensure all connection messages go to callback
+    void testLogCallbackInit()
+    {
+        logMessages.clear();
+        watchctx_t ctx;
+        zhandle_t *zk = createClient(&ctx, &logMessageHandler);
+        CPPUNIT_ASSERT_EQUAL(zoo_get_log_callback(zk), &logMessageHandler);
+
+        // All the connection messages should have gone to the callback -- don't
+        // want this to be a maintenance issue so we're not asserting exact count
+        int numBefore = logMessages.size();
+        CPPUNIT_ASSERT(numBefore != 0);
+
+        // Log 10 messages and ensure all go to callback
+        int expected = 10;
+        for (int i = 0; i < expected; i++)
+        {
+            LOG_INFO(LOGCALLBACK(zk), "%s #%d", __FUNCTION__, i);
+        }
+        CPPUNIT_ASSERT(logMessages.size() == numBefore + expected);
+    }
+
+    // Test clearing log callback -- logging should resume going to logstream
+    void testLogCallbackClear()
+    {
+        logMessages.clear();
+        watchctx_t ctx;
+        zhandle_t *zk = createClient(&ctx, &logMessageHandler);
+        CPPUNIT_ASSERT_EQUAL(zoo_get_log_callback(zk), &logMessageHandler);
+
+        // All the connection messages should have gone to the callback -- again, we don't
+        // want this to be a maintenance issue so we're not asserting exact count
+        int numBefore = logMessages.size();
+        CPPUNIT_ASSERT(numBefore > 0);
+
+        // Clear log_callback
+        zoo_set_log_callback(zk, NULL);
+
+        // Future log messages should go to logstream not callback
+        LOG_INFO(LOGCALLBACK(zk), __FUNCTION__);
+        int numAfter = logMessages.size();
+        CPPUNIT_ASSERT_EQUAL(numBefore, numAfter);
+    }
+
     void testAsyncWatcherAutoReset()
     void testAsyncWatcherAutoReset()
     {
     {
         watchctx_t ctx;
         watchctx_t ctx;

+ 4 - 4
src/c/tests/TestOperations.cc

@@ -490,7 +490,7 @@ public:
                     break;
                     break;
                 }
                 }
             }
             }
-            //TEST_TRACE(("Finished %d iterations",i));
+            //TEST_TRACE("Finished %d iterations",i);
         }
         }
         virtual void validate(const char* file, int line) const{
         virtual void validate(const char* file, int line) const{
             CPPUNIT_ASSERT_EQUAL_MESSAGE_LOC("ZOK != rc",(int)ZOK,rc_,file,line);
             CPPUNIT_ASSERT_EQUAL_MESSAGE_LOC("ZOK != rc",(int)ZOK,rc_,file,line);
@@ -525,7 +525,7 @@ public:
         zookeeper_close(lzh); 
         zookeeper_close(lzh); 
   
   
         for(int counter=0; counter<200; counter++){
         for(int counter=0; counter<200; counter++){
-            TEST_TRACE(("Loop count %d",counter));
+            TEST_TRACE("Loop count %d",counter);
             
             
             CloseFinally guard(&zh);
             CloseFinally guard(&zh);
 
 
@@ -539,7 +539,7 @@ public:
             jmgr.startJobsImmediately();
             jmgr.startJobsImmediately();
             jmgr.wait();
             jmgr.wait();
             VALIDATE_JOBS(jmgr);
             VALIDATE_JOBS(jmgr);
-            TEST_TRACE(("run %d finished",counter));
+            TEST_TRACE("run %d finished",counter);
         }
         }
 
 
     }
     }
@@ -564,7 +564,7 @@ public:
     void testOperationsAndDisconnectConcurrently1()
     void testOperationsAndDisconnectConcurrently1()
     {
     {
         for(int counter=0; counter<50; counter++){
         for(int counter=0; counter<50; counter++){
-            //TEST_TRACE(("Loop count %d",counter));
+            //TEST_TRACE("Loop count %d",counter);
             // frozen time -- no timeouts and no pings
             // frozen time -- no timeouts and no pings
             Mock_gettimeofday timeMock;
             Mock_gettimeofday timeMock;
             
             

+ 2 - 2
src/c/tests/Util.h

@@ -36,8 +36,8 @@
     __real_##sym params
     __real_##sym params
 
 
 // must include "src/zookeeper_log.h" to be able to use this macro
 // must include "src/zookeeper_log.h" to be able to use this macro
-#define TEST_TRACE(x) \
-    log_message(ZOO_LOG_LEVEL_DEBUG,__LINE__,__func__,format_log_message x)
+#define TEST_TRACE(x...) \
+    log_message(LOGSTREAM, ZOO_LOG_LEVEL_DEBUG,__LINE__,__func__,x)
 
 
 extern const std::string EMPTY_STRING;
 extern const std::string EMPTY_STRING;
 
 

Some files were not shown because too many files changed in this diff