Преглед на файлове

ZOOKEEPER-886. Hedwig Server stays in "disconnected" state when connection to ZK dies but gets reconnected

git-svn-id: https://svn.apache.org/repos/asf/hadoop/zookeeper/trunk@1021501 13f79535-47bb-0310-9956-ffa450edef68
Benjamin Reed преди 14 години
родител
ревизия
fe2f2ced4e
променени са 2 файла, в които са добавени 12 реда и са изтрити 5 реда
  1. 2 0
      CHANGES.txt
  2. 10 5
      src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java

+ 2 - 0
CHANGES.txt

@@ -110,6 +110,8 @@ BUGFIXES:
   ZOOKEEPER-822. Leader election taking a long time to complete
   (Vishal K via phunt)
 
+  ZOOKEEPER-866. Hedwig Server stays in "disconnected" state when connection to ZK dies but gets reconnected (erwin tam via breed)
+
 IMPROVEMENTS:
   ZOOKEEPER-724. Improve junit test integration - log harness information 
   (phunt via mahadev)

+ 10 - 5
src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java

@@ -92,14 +92,19 @@ public class ZkTopicManager extends AbstractTopicManager implements TopicManager
                     if (event.getState().equals(Watcher.Event.KeeperState.Disconnected)) {
                         logger.warn("ZK client has been disconnected to the ZK server!");
                         isSuspended = true;
+                    } else if (event.getState().equals(Watcher.Event.KeeperState.SyncConnected)) {
+			if (isSuspended) {
+	                    logger.info("ZK client has been reconnected to the ZK server!");
+			}
+			isSuspended = false;
                     }
-                } else if (event.getState().equals(Watcher.Event.KeeperState.Expired)) {
+		}
+		// Check for expired connection.
+                if (event.getState().equals(Watcher.Event.KeeperState.Expired)) {
                     logger.error("ZK client connection to the ZK server has expired!");
                     System.exit(1);
-                } else if (event.getState().equals(Watcher.Event.KeeperState.SyncConnected)) {
-                    isSuspended = false;
-                }
-            }
+                }             
+	    }
         });
         final SynchronousQueue<Either<Void, PubSubException>> queue = new SynchronousQueue<Either<Void, PubSubException>>();