Browse Source

HADOOP-5313. Added 10 minutes limit for terminator thread to finish.
Removed static for adaptor.
Removed static for ChunkReceiver.
Removed log(" ") from the busy loop.
Add debug logging.
(Contribute by Jerome Boulon via eyang)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@748357 13f79535-47bb-0310-9956-ffa450edef68

Eric Yang 16 years ago
parent
commit
63f4a27445

+ 36 - 22
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java

@@ -1,38 +1,52 @@
 package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
 
-import java.io.IOException;
-import java.util.TimerTask;
-
 import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
-import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
 import org.apache.hadoop.chukwa.datacollection.adaptor.FileAdaptor;
-import org.apache.hadoop.chukwa.inputtools.plugin.metrics.ExecHelper;
 import org.apache.log4j.Logger;
 
 public class TerminatorThread extends Thread {
 	private static Logger log =Logger.getLogger(FileAdaptor.class);
-	private static FileTailingAdaptor adaptor = null;
-	private static ChunkReceiver eq = null;
+
+	private FileTailingAdaptor adaptor = null;
+	private ChunkReceiver eq = null;
 	
 	public TerminatorThread(FileTailingAdaptor adaptor, ChunkReceiver eq) {
 		this.adaptor = adaptor;
 		this.eq = eq;
 	}
 
-	public void run() {
-   	    log.info("Terminator thread started.");
-  	    try {
-  	    	while(adaptor.tailFile(eq)) {
-  	    		log.info("");
-  	    	}
-		} catch (InterruptedException e) {
-			log.info("Unable to send data to collector for 60 seconds, force shutdown.");
-		}
-        log.info("Terminator finished.");
-        try {
-        	adaptor.reader.close();
-        } catch (IOException ex) {
-        	
+  public void run() {
+    
+    long endTime = System.currentTimeMillis() + (10*60*1000); // now + 10 mins
+    int count = 0;
+    log.info("Terminator thread started." + adaptor.toWatch.getPath());
+    try {
+      while (adaptor.tailFile(eq)) {
+        if (log.isDebugEnabled()) {
+          log.debug("Terminator thread:" + adaptor.toWatch.getPath() + " still working");
         }
-	}
+        long now = System.currentTimeMillis();
+        if (now > endTime ) {
+          log.warn("TerminatorThread should have been finished by now! count=" + count);
+          count ++;
+          endTime = System.currentTimeMillis() + (10*60*1000); // now + 10 mins
+          if (count >3 ) {
+            log.warn("TerminatorThread should have been finished by now, stopping it now! count=" + count);
+            break;
+          }
+        }
+      }
+    } catch (InterruptedException e) {
+      log.info("InterruptedException on Terminator thread:" + adaptor.toWatch.getPath(),e);
+    } catch (Throwable e) {
+      log.warn("Exception on Terminator thread:" + adaptor.toWatch.getPath(),e);
+    }
+    
+    log.info("Terminator thread finished." + adaptor.toWatch.getPath());
+    try {
+      adaptor.reader.close();
+    } catch (Throwable ex) {
+      // do nothing
+    }
+  }
 }