|
@@ -25,12 +25,14 @@ import java.util.HashMap;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
|
|
|
|
|
|
+import org.apache.ambari.logfeeder.input.cache.LRUCache;
|
|
import org.apache.ambari.logfeeder.common.ConfigBlock;
|
|
import org.apache.ambari.logfeeder.common.ConfigBlock;
|
|
import org.apache.ambari.logfeeder.common.LogfeederException;
|
|
import org.apache.ambari.logfeeder.common.LogfeederException;
|
|
import org.apache.ambari.logfeeder.filter.Filter;
|
|
import org.apache.ambari.logfeeder.filter.Filter;
|
|
import org.apache.ambari.logfeeder.metrics.MetricData;
|
|
import org.apache.ambari.logfeeder.metrics.MetricData;
|
|
import org.apache.ambari.logfeeder.output.Output;
|
|
import org.apache.ambari.logfeeder.output.Output;
|
|
import org.apache.ambari.logfeeder.output.OutputManager;
|
|
import org.apache.ambari.logfeeder.output.OutputManager;
|
|
|
|
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
|
|
import org.apache.log4j.Logger;
|
|
import org.apache.log4j.Logger;
|
|
|
|
|
|
public abstract class Input extends ConfigBlock implements Runnable {
|
|
public abstract class Input extends ConfigBlock implements Runnable {
|
|
@@ -39,7 +41,18 @@ public abstract class Input extends ConfigBlock implements Runnable {
|
|
private static final boolean DEFAULT_TAIL = true;
|
|
private static final boolean DEFAULT_TAIL = true;
|
|
private static final boolean DEFAULT_USE_EVENT_MD5 = false;
|
|
private static final boolean DEFAULT_USE_EVENT_MD5 = false;
|
|
private static final boolean DEFAULT_GEN_EVENT_MD5 = true;
|
|
private static final boolean DEFAULT_GEN_EVENT_MD5 = true;
|
|
-
|
|
|
|
|
|
+ private static final boolean DEFAULT_CACHE_ENABLED = false;
|
|
|
|
+ private static final boolean DEFAULT_CACHE_DEDUP_LAST = false;
|
|
|
|
+ private static final int DEFAULT_CACHE_SIZE = 100;
|
|
|
|
+ private static final long DEFAULT_CACHE_DEDUP_INTERVAL = 1000;
|
|
|
|
+ private static final String DEFAULT_CACHE_KEY_FIELD = "log_message";
|
|
|
|
+
|
|
|
|
+ private static final String CACHE_ENABLED = "cache_enabled";
|
|
|
|
+ private static final String CACHE_KEY_FIELD = "cache_key_field";
|
|
|
|
+ private static final String CACHE_LAST_DEDUP_ENABLED = "cache_last_dedup_enabled";
|
|
|
|
+ private static final String CACHE_SIZE = "cache_size";
|
|
|
|
+ private static final String CACHE_DEDUP_INTERVAL = "cache_dedup_interval";
|
|
|
|
+
|
|
protected InputManager inputManager;
|
|
protected InputManager inputManager;
|
|
protected OutputManager outputManager;
|
|
protected OutputManager outputManager;
|
|
private List<Output> outputList = new ArrayList<Output>();
|
|
private List<Output> outputList = new ArrayList<Output>();
|
|
@@ -54,6 +67,9 @@ public abstract class Input extends ConfigBlock implements Runnable {
|
|
private boolean useEventMD5;
|
|
private boolean useEventMD5;
|
|
private boolean genEventMD5;
|
|
private boolean genEventMD5;
|
|
|
|
|
|
|
|
+ private LRUCache cache;
|
|
|
|
+ private String cacheKeyField;
|
|
|
|
+
|
|
protected MetricData readBytesMetric = new MetricData(getReadBytesMetricName(), false);
|
|
protected MetricData readBytesMetric = new MetricData(getReadBytesMetricName(), false);
|
|
protected String getReadBytesMetricName() {
|
|
protected String getReadBytesMetricName() {
|
|
return null;
|
|
return null;
|
|
@@ -107,6 +123,7 @@ public abstract class Input extends ConfigBlock implements Runnable {
|
|
@Override
|
|
@Override
|
|
public void init() throws Exception {
|
|
public void init() throws Exception {
|
|
super.init();
|
|
super.init();
|
|
|
|
+ initCache();
|
|
tail = getBooleanValue("tail", DEFAULT_TAIL);
|
|
tail = getBooleanValue("tail", DEFAULT_TAIL);
|
|
useEventMD5 = getBooleanValue("use_event_md5_as_id", DEFAULT_USE_EVENT_MD5);
|
|
useEventMD5 = getBooleanValue("use_event_md5_as_id", DEFAULT_USE_EVENT_MD5);
|
|
genEventMD5 = getBooleanValue("gen_event_md5", DEFAULT_GEN_EVENT_MD5);
|
|
genEventMD5 = getBooleanValue("gen_event_md5", DEFAULT_GEN_EVENT_MD5);
|
|
@@ -114,6 +131,7 @@ public abstract class Input extends ConfigBlock implements Runnable {
|
|
if (firstFilter != null) {
|
|
if (firstFilter != null) {
|
|
firstFilter.init();
|
|
firstFilter.init();
|
|
}
|
|
}
|
|
|
|
+
|
|
}
|
|
}
|
|
|
|
|
|
boolean monitor() {
|
|
boolean monitor() {
|
|
@@ -217,6 +235,33 @@ public abstract class Input extends ConfigBlock implements Runnable {
|
|
isClosed = true;
|
|
isClosed = true;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private void initCache() {
|
|
|
|
+ boolean cacheEnabled = getConfigValue(CACHE_ENABLED) != null
|
|
|
|
+ ? getBooleanValue(CACHE_ENABLED, DEFAULT_CACHE_ENABLED)
|
|
|
|
+ : LogFeederUtil.getBooleanProperty("logfeeder.cache.enabled", DEFAULT_CACHE_ENABLED);
|
|
|
|
+ if (cacheEnabled) {
|
|
|
|
+ String cacheKeyField = getConfigValue(CACHE_KEY_FIELD) != null
|
|
|
|
+ ? getStringValue(CACHE_KEY_FIELD)
|
|
|
|
+ : LogFeederUtil.getStringProperty("logfeeder.cache.key.field", DEFAULT_CACHE_KEY_FIELD);
|
|
|
|
+
|
|
|
|
+ setCacheKeyField(getStringValue(cacheKeyField));
|
|
|
|
+
|
|
|
|
+ boolean cacheLastDedupEnabled = getConfigValue(CACHE_LAST_DEDUP_ENABLED) != null
|
|
|
|
+ ? getBooleanValue(CACHE_LAST_DEDUP_ENABLED, DEFAULT_CACHE_DEDUP_LAST)
|
|
|
|
+ : LogFeederUtil.getBooleanProperty("logfeeder.cache.last.dedup.enabled", DEFAULT_CACHE_DEDUP_LAST);
|
|
|
|
+
|
|
|
|
+ int cacheSize = getConfigValue(CACHE_SIZE) != null
|
|
|
|
+ ? getIntValue(CACHE_SIZE, DEFAULT_CACHE_SIZE)
|
|
|
|
+ : LogFeederUtil.getIntProperty("logfeeder.cache.size", DEFAULT_CACHE_SIZE);
|
|
|
|
+
|
|
|
|
+ long cacheDedupInterval = getConfigValue(CACHE_DEDUP_INTERVAL) != null
|
|
|
|
+ ? getLongValue(CACHE_DEDUP_INTERVAL, DEFAULT_CACHE_DEDUP_INTERVAL)
|
|
|
|
+ : Long.parseLong(LogFeederUtil.getStringProperty("logfeeder.cache.dedup.interval", String.valueOf(DEFAULT_CACHE_DEDUP_INTERVAL)));
|
|
|
|
+
|
|
|
|
+ setCache(new LRUCache(cacheSize, filePath, cacheDedupInterval, cacheLastDedupEnabled));
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
public boolean isTail() {
|
|
public boolean isTail() {
|
|
return tail;
|
|
return tail;
|
|
}
|
|
}
|
|
@@ -257,6 +302,22 @@ public abstract class Input extends ConfigBlock implements Runnable {
|
|
return thread;
|
|
return thread;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ public LRUCache getCache() {
|
|
|
|
+ return cache;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void setCache(LRUCache cache) {
|
|
|
|
+ this.cache = cache;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public String getCacheKeyField() {
|
|
|
|
+ return cacheKeyField;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void setCacheKeyField(String cacheKeyField) {
|
|
|
|
+ this.cacheKeyField = cacheKeyField;
|
|
|
|
+ }
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public String getNameForThread() {
|
|
public String getNameForThread() {
|
|
if (filePath != null) {
|
|
if (filePath != null) {
|