|
@@ -51,9 +51,12 @@ public class FileTailingAdaptor implements Adaptor
|
|
public static final int DEFAULT_MAX_READ_SIZE = 128 * 1024 ;
|
|
public static final int DEFAULT_MAX_READ_SIZE = 128 * 1024 ;
|
|
public static int MAX_READ_SIZE = DEFAULT_MAX_READ_SIZE ;
|
|
public static int MAX_READ_SIZE = DEFAULT_MAX_READ_SIZE ;
|
|
public static int MAX_RETRIES = 300;
|
|
public static int MAX_RETRIES = 300;
|
|
|
|
+ public static int GRACEFUL_PERIOD = 3 * 60 * 1000; // 3 minutes
|
|
|
|
+
|
|
protected static Configuration conf = null;
|
|
protected static Configuration conf = null;
|
|
private int attempts = 0;
|
|
private int attempts = 0;
|
|
-
|
|
|
|
|
|
+ private long gracefulPeriodExpired = 0l;
|
|
|
|
+ private boolean adaptorInError = false;
|
|
File toWatch;
|
|
File toWatch;
|
|
/**
|
|
/**
|
|
* next PHYSICAL offset to read
|
|
* next PHYSICAL offset to read
|
|
@@ -155,22 +158,47 @@ public class FileTailingAdaptor implements Adaptor
|
|
*/
|
|
*/
|
|
public synchronized boolean tailFile(ChunkReceiver eq) throws InterruptedException {
|
|
public synchronized boolean tailFile(ChunkReceiver eq) throws InterruptedException {
|
|
boolean hasMoreData = false;
|
|
boolean hasMoreData = false;
|
|
|
|
+
|
|
|
|
+
|
|
try {
|
|
try {
|
|
- if(!toWatch.exists() && attempts>MAX_RETRIES) {
|
|
|
|
- log.warn("Adaptor|" + adaptorID +"| File does not exist: "+toWatch.getAbsolutePath()+", streaming policy expired. File removed from streaming.");
|
|
|
|
- ChukwaAgent agent = ChukwaAgent.getAgent();
|
|
|
|
- if (agent != null) {
|
|
|
|
- agent.stopAdaptor(adaptorID, false);
|
|
|
|
- } else {
|
|
|
|
- log.info("Agent is null, running in default mode");
|
|
|
|
- }
|
|
|
|
- tailer.stopWatchingFile(this);
|
|
|
|
- return false;
|
|
|
|
- } else if(!toWatch.exists()) {
|
|
|
|
- log.warn("failed to stream data for: "+toWatch.getAbsolutePath()+", attempt: "+attempts+" of "+MAX_RETRIES);
|
|
|
|
- attempts++;
|
|
|
|
- return false; //no more data
|
|
|
|
|
|
+ if( (adaptorInError == true) && (System.currentTimeMillis() > gracefulPeriodExpired)) {
|
|
|
|
+ if (!toWatch.exists()) {
|
|
|
|
+ log.warn("Adaptor|" + adaptorID +"|attempts=" + attempts + "| File does not exist: "+toWatch.getAbsolutePath()+", streaming policy expired. File removed from streaming.");
|
|
|
|
+ } else if (!toWatch.canRead()) {
|
|
|
|
+ log.warn("Adaptor|" + adaptorID +"|attempts=" + attempts + "| File cannot be read: "+toWatch.getAbsolutePath()+", streaming policy expired. File removed from streaming.");
|
|
|
|
+ } else {
|
|
|
|
+ // Should have never been there
|
|
|
|
+ adaptorInError = false;
|
|
|
|
+ gracefulPeriodExpired = 0L;
|
|
|
|
+ attempts = 0;
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ ChukwaAgent agent = ChukwaAgent.getAgent();
|
|
|
|
+ if (agent != null) {
|
|
|
|
+ agent.stopAdaptor(adaptorID, false);
|
|
|
|
+ } else {
|
|
|
|
+ log.info("Agent is null, running in default mode");
|
|
|
|
+ }
|
|
|
|
+ return false;
|
|
|
|
+
|
|
|
|
+ } else if(!toWatch.exists() || !toWatch.canRead()) {
|
|
|
|
+ if (adaptorInError == false) {
|
|
|
|
+ long now = System.currentTimeMillis();
|
|
|
|
+ gracefulPeriodExpired = now + GRACEFUL_PERIOD;
|
|
|
|
+ adaptorInError = true;
|
|
|
|
+ attempts = 0;
|
|
|
|
+ log.warn("failed to stream data for: "+toWatch.getAbsolutePath()+", graceful period will Expire at now:" + now
|
|
|
|
+ + " + " + GRACEFUL_PERIOD + " secs, i.e:" + gracefulPeriodExpired);
|
|
|
|
+ } else if (attempts%10 == 0) {
|
|
|
|
+ log.info("failed to stream data for: "+toWatch.getAbsolutePath()+", attempt: "+attempts);
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ attempts++;
|
|
|
|
+ return false; //no more data
|
|
|
|
+ }
|
|
|
|
+
|
|
if (reader == null)
|
|
if (reader == null)
|
|
{
|
|
{
|
|
reader = new RandomAccessFile(toWatch, "r");
|
|
reader = new RandomAccessFile(toWatch, "r");
|
|
@@ -281,6 +309,7 @@ public class FileTailingAdaptor implements Adaptor
|
|
log.warn("failure reading " + toWatch, e);
|
|
log.warn("failure reading " + toWatch, e);
|
|
}
|
|
}
|
|
attempts=0;
|
|
attempts=0;
|
|
|
|
+ adaptorInError = false;
|
|
return hasMoreData;
|
|
return hasMoreData;
|
|
}
|
|
}
|
|
|
|
|