|
@@ -20,14 +20,10 @@
|
|
|
package org.apache.ambari.logfeeder;
|
|
|
|
|
|
import java.io.BufferedInputStream;
|
|
|
-import java.io.BufferedReader;
|
|
|
import java.io.File;
|
|
|
-import java.io.FileInputStream;
|
|
|
-import java.io.IOException;
|
|
|
-import java.io.InputStreamReader;
|
|
|
import java.lang.reflect.Type;
|
|
|
import java.util.ArrayList;
|
|
|
-import java.util.Collection;
|
|
|
+import java.util.Arrays;
|
|
|
import java.util.Collections;
|
|
|
import java.util.Comparator;
|
|
|
import java.util.Date;
|
|
@@ -39,18 +35,21 @@ import java.util.Set;
|
|
|
|
|
|
import org.apache.ambari.logfeeder.filter.Filter;
|
|
|
import org.apache.ambari.logfeeder.input.Input;
|
|
|
-import org.apache.ambari.logfeeder.input.InputMgr;
|
|
|
+import org.apache.ambari.logfeeder.input.InputManager;
|
|
|
import org.apache.ambari.logfeeder.input.InputSimulate;
|
|
|
-import org.apache.ambari.logfeeder.logconfig.LogfeederScheduler;
|
|
|
-import org.apache.ambari.logfeeder.metrics.MetricCount;
|
|
|
-import org.apache.ambari.logfeeder.metrics.MetricsMgr;
|
|
|
+import org.apache.ambari.logfeeder.logconfig.LogConfigHandler;
|
|
|
+import org.apache.ambari.logfeeder.metrics.MetricData;
|
|
|
+import org.apache.ambari.logfeeder.metrics.MetricsManager;
|
|
|
import org.apache.ambari.logfeeder.output.Output;
|
|
|
-import org.apache.ambari.logfeeder.output.OutputMgr;
|
|
|
+import org.apache.ambari.logfeeder.output.OutputManager;
|
|
|
import org.apache.ambari.logfeeder.util.AliasUtil;
|
|
|
import org.apache.ambari.logfeeder.util.FileUtil;
|
|
|
import org.apache.ambari.logfeeder.util.LogFeederUtil;
|
|
|
-import org.apache.ambari.logfeeder.util.AliasUtil.ALIAS_PARAM;
|
|
|
-import org.apache.ambari.logfeeder.util.AliasUtil.ALIAS_TYPE;
|
|
|
+import org.apache.commons.collections.CollectionUtils;
|
|
|
+import org.apache.commons.io.FileUtils;
|
|
|
+import org.apache.commons.io.IOUtils;
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
+import org.apache.ambari.logfeeder.util.AliasUtil.AliasType;
|
|
|
import org.apache.hadoop.util.ShutdownHookManager;
|
|
|
import org.apache.log4j.Level;
|
|
|
import org.apache.log4j.Logger;
|
|
@@ -58,171 +57,142 @@ import org.apache.log4j.Logger;
|
|
|
import com.google.gson.reflect.TypeToken;
|
|
|
|
|
|
public class LogFeeder {
|
|
|
- private static final Logger logger = Logger.getLogger(LogFeeder.class);
|
|
|
+ private static final Logger LOG = Logger.getLogger(LogFeeder.class);
|
|
|
|
|
|
private static final int LOGFEEDER_SHUTDOWN_HOOK_PRIORITY = 30;
|
|
|
+ private static final int CHECKPOINT_CLEAN_INTERVAL_MS = 24 * 60 * 60 * 60 * 1000; // 24 hours
|
|
|
|
|
|
- private Collection<Output> outputList = new ArrayList<Output>();
|
|
|
+ private OutputManager outputManager = new OutputManager();
|
|
|
+ private InputManager inputManager = new InputManager();
|
|
|
+ private MetricsManager metricsManager = new MetricsManager();
|
|
|
|
|
|
- private OutputMgr outMgr = new OutputMgr();
|
|
|
- private InputMgr inputMgr = new InputMgr();
|
|
|
- private MetricsMgr metricsMgr = new MetricsMgr();
|
|
|
+ public static Map<String, Object> globalConfigs = new HashMap<>();
|
|
|
|
|
|
- public static Map<String, Object> globalMap = null;
|
|
|
- private String[] inputParams;
|
|
|
-
|
|
|
- private List<Map<String, Object>> globalConfigList = new ArrayList<Map<String, Object>>();
|
|
|
- private List<Map<String, Object>> inputConfigList = new ArrayList<Map<String, Object>>();
|
|
|
- private List<Map<String, Object>> filterConfigList = new ArrayList<Map<String, Object>>();
|
|
|
- private List<Map<String, Object>> outputConfigList = new ArrayList<Map<String, Object>>();
|
|
|
+ private List<Map<String, Object>> inputConfigList = new ArrayList<>();
|
|
|
+ private List<Map<String, Object>> filterConfigList = new ArrayList<>();
|
|
|
+ private List<Map<String, Object>> outputConfigList = new ArrayList<>();
|
|
|
|
|
|
- private int checkPointCleanIntervalMS = 24 * 60 * 60 * 60 * 1000; // 24 hours
|
|
|
private long lastCheckPointCleanedMS = 0;
|
|
|
-
|
|
|
- private static boolean isLogfeederCompleted = false;
|
|
|
-
|
|
|
+ private boolean isLogfeederCompleted = false;
|
|
|
private Thread statLoggerThread = null;
|
|
|
|
|
|
- private LogFeeder(String[] args) {
|
|
|
- inputParams = args;
|
|
|
+ private LogFeeder() {}
|
|
|
+
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ init();
|
|
|
+ monitor();
|
|
|
+ waitOnAllDaemonThreads();
|
|
|
+ } catch (Throwable t) {
|
|
|
+ LOG.fatal("Caught exception in main.", t);
|
|
|
+ System.exit(1);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private void init() throws Throwable {
|
|
|
+ Date startTime = new Date();
|
|
|
|
|
|
- LogFeederUtil.loadProperties("logfeeder.properties", inputParams);
|
|
|
-
|
|
|
- String configFiles = LogFeederUtil.getStringProperty("logfeeder.config.files");
|
|
|
- logger.info("logfeeder.config.files=" + configFiles);
|
|
|
+ loadConfigFiles();
|
|
|
+ addSimulatedInputs();
|
|
|
+ mergeAllConfigs();
|
|
|
|
|
|
- String[] configFileList = null;
|
|
|
- if (configFiles != null) {
|
|
|
- configFileList = configFiles.split(",");
|
|
|
- }
|
|
|
- //list of config those are there in cmd line config dir , end with .json
|
|
|
- String[] cmdLineConfigs = getConfigFromCmdLine();
|
|
|
- //merge both config
|
|
|
- String mergedConfigList[] = LogFeederUtil.mergeArray(configFileList,
|
|
|
- cmdLineConfigs);
|
|
|
- //mergedConfigList is null then set default conifg
|
|
|
- if (mergedConfigList == null || mergedConfigList.length == 0) {
|
|
|
- mergedConfigList = LogFeederUtil.getStringProperty("config.file",
|
|
|
- "config.json").split(",");
|
|
|
- }
|
|
|
- for (String configFileName : mergedConfigList) {
|
|
|
- logger.info("Going to load config file:" + configFileName);
|
|
|
- //escape space from config file path
|
|
|
- configFileName= configFileName.replace("\\ ", "%20");
|
|
|
+ LogConfigHandler.handleConfig();
|
|
|
+
|
|
|
+ outputManager.init();
|
|
|
+ inputManager.init();
|
|
|
+ metricsManager.init();
|
|
|
+
|
|
|
+ LOG.debug("==============");
|
|
|
+
|
|
|
+ Date endTime = new Date();
|
|
|
+ LOG.info("Took " + (endTime.getTime() - startTime.getTime()) + " ms to initialize");
|
|
|
+ }
|
|
|
+
|
|
|
+ private void loadConfigFiles() throws Exception {
|
|
|
+ List<String> configFiles = getConfigFiles();
|
|
|
+ for (String configFileName : configFiles) {
|
|
|
+ LOG.info("Going to load config file:" + configFileName);
|
|
|
+ configFileName = configFileName.replace("\\ ", "%20");
|
|
|
File configFile = new File(configFileName);
|
|
|
if (configFile.exists() && configFile.isFile()) {
|
|
|
- logger.info("Config file exists in path."
|
|
|
- + configFile.getAbsolutePath());
|
|
|
+ LOG.info("Config file exists in path." + configFile.getAbsolutePath());
|
|
|
loadConfigsUsingFile(configFile);
|
|
|
} else {
|
|
|
- // Let's try to load it from class loader
|
|
|
- logger.info("Trying to load config file from classloader: "
|
|
|
- + configFileName);
|
|
|
+ LOG.info("Trying to load config file from classloader: " + configFileName);
|
|
|
loadConfigsUsingClassLoader(configFileName);
|
|
|
- logger.info("Loaded config file from classloader: "
|
|
|
- + configFileName);
|
|
|
+ LOG.info("Loaded config file from classloader: " + configFileName);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- addSimulatedInputs();
|
|
|
-
|
|
|
- mergeAllConfigs();
|
|
|
-
|
|
|
- LogfeederScheduler.INSTANCE.start();
|
|
|
-
|
|
|
- outMgr.setOutputList(outputList);
|
|
|
- for (Output output : outputList) {
|
|
|
- output.init();
|
|
|
- }
|
|
|
- inputMgr.init();
|
|
|
- metricsMgr.init();
|
|
|
- logger.debug("==============");
|
|
|
}
|
|
|
|
|
|
- private void loadConfigsUsingClassLoader(String configFileName) throws Exception {
|
|
|
- BufferedInputStream fileInputStream = null;
|
|
|
- BufferedReader br = null;
|
|
|
- try {
|
|
|
- fileInputStream = (BufferedInputStream) this
|
|
|
- .getClass().getClassLoader()
|
|
|
- .getResourceAsStream(configFileName);
|
|
|
- if (fileInputStream != null) {
|
|
|
- br = new BufferedReader(new InputStreamReader(
|
|
|
- fileInputStream));
|
|
|
- String configData = readFile(br);
|
|
|
- loadConfigs(configData);
|
|
|
- } else {
|
|
|
- throw new Exception("Can't find configFile=" + configFileName);
|
|
|
- }
|
|
|
- } finally {
|
|
|
- if (br != null) {
|
|
|
- try {
|
|
|
- br.close();
|
|
|
- } catch (IOException e) {
|
|
|
- }
|
|
|
- }
|
|
|
+ private List<String> getConfigFiles() {
|
|
|
+ List<String> configFiles = new ArrayList<>();
|
|
|
+
|
|
|
+ String logfeederConfigFilesProperty = LogFeederUtil.getStringProperty("logfeeder.config.files");
|
|
|
+ LOG.info("logfeeder.config.files=" + logfeederConfigFilesProperty);
|
|
|
+ if (logfeederConfigFilesProperty != null) {
|
|
|
+ configFiles.addAll(Arrays.asList(logfeederConfigFilesProperty.split(",")));
|
|
|
+ }
|
|
|
|
|
|
- if (fileInputStream != null) {
|
|
|
- try {
|
|
|
- fileInputStream.close();
|
|
|
- } catch (IOException e) {
|
|
|
- }
|
|
|
+ String inputConfigDir = LogFeederUtil.getStringProperty("input_config_dir");
|
|
|
+ if (StringUtils.isNotEmpty(inputConfigDir)) {
|
|
|
+ File configDirFile = new File(inputConfigDir);
|
|
|
+ List<File> inputConfigFiles = FileUtil.getAllFileFromDir(configDirFile, "json", false);
|
|
|
+ for (File inputConfigFile : inputConfigFiles) {
|
|
|
+ configFiles.add(inputConfigFile.getAbsolutePath());
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ if (CollectionUtils.isEmpty(configFiles)) {
|
|
|
+ String configFileProperty = LogFeederUtil.getStringProperty("config.file", "config.json");
|
|
|
+ configFiles.addAll(Arrays.asList(configFileProperty.split(",")));
|
|
|
+ }
|
|
|
+
|
|
|
+ return configFiles;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * This method loads the configurations from the given file.
|
|
|
- */
|
|
|
private void loadConfigsUsingFile(File configFile) throws Exception {
|
|
|
- FileInputStream fileInputStream = null;
|
|
|
try {
|
|
|
- fileInputStream = new FileInputStream(configFile);
|
|
|
- BufferedReader br = new BufferedReader(new InputStreamReader(
|
|
|
- fileInputStream));
|
|
|
- String configData = readFile(br);
|
|
|
+ String configData = FileUtils.readFileToString(configFile);
|
|
|
loadConfigs(configData);
|
|
|
} catch (Exception t) {
|
|
|
- logger.error("Error opening config file. configFilePath="
|
|
|
- + configFile.getAbsolutePath());
|
|
|
+ LOG.error("Error opening config file. configFilePath=" + configFile.getAbsolutePath());
|
|
|
throw t;
|
|
|
- } finally {
|
|
|
- if (fileInputStream != null) {
|
|
|
- try {
|
|
|
- fileInputStream.close();
|
|
|
- } catch (Throwable t) {
|
|
|
- // ignore
|
|
|
- }
|
|
|
- }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void loadConfigsUsingClassLoader(String configFileName) throws Exception {
|
|
|
+ try (BufferedInputStream fis = (BufferedInputStream) this.getClass().getClassLoader().getResourceAsStream(configFileName)) {
|
|
|
+ String configData = IOUtils.toString(fis);
|
|
|
+ loadConfigs(configData);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
private void loadConfigs(String configData) throws Exception {
|
|
|
- Type type = new TypeToken<Map<String, Object>>() {
|
|
|
- }.getType();
|
|
|
- Map<String, Object> configMap = LogFeederUtil.getGson().fromJson(
|
|
|
- configData, type);
|
|
|
+ Type type = new TypeToken<Map<String, Object>>() {}.getType();
|
|
|
+ Map<String, Object> configMap = LogFeederUtil.getGson().fromJson(configData, type);
|
|
|
|
|
|
// Get the globals
|
|
|
for (String key : configMap.keySet()) {
|
|
|
- if (key.equalsIgnoreCase("global")) {
|
|
|
- globalConfigList.add((Map<String, Object>) configMap.get(key));
|
|
|
- } else if (key.equalsIgnoreCase("input")) {
|
|
|
- List<Map<String, Object>> mapList = (List<Map<String, Object>>) configMap
|
|
|
- .get(key);
|
|
|
- inputConfigList.addAll(mapList);
|
|
|
- } else if (key.equalsIgnoreCase("filter")) {
|
|
|
- List<Map<String, Object>> mapList = (List<Map<String, Object>>) configMap
|
|
|
- .get(key);
|
|
|
- filterConfigList.addAll(mapList);
|
|
|
- } else if (key.equalsIgnoreCase("output")) {
|
|
|
- List<Map<String, Object>> mapList = (List<Map<String, Object>>) configMap
|
|
|
- .get(key);
|
|
|
- outputConfigList.addAll(mapList);
|
|
|
+ switch (key) {
|
|
|
+ case "global" :
|
|
|
+ globalConfigs.putAll((Map<String, Object>) configMap.get(key));
|
|
|
+ break;
|
|
|
+ case "input" :
|
|
|
+ List<Map<String, Object>> inputConfig = (List<Map<String, Object>>) configMap.get(key);
|
|
|
+ inputConfigList.addAll(inputConfig);
|
|
|
+ break;
|
|
|
+ case "filter" :
|
|
|
+ List<Map<String, Object>> filterConfig = (List<Map<String, Object>>) configMap.get(key);
|
|
|
+ filterConfigList.addAll(filterConfig);
|
|
|
+ break;
|
|
|
+ case "output" :
|
|
|
+ List<Map<String, Object>> outputConfig = (List<Map<String, Object>>) configMap.get(key);
|
|
|
+ outputConfigList.addAll(outputConfig);
|
|
|
+ break;
|
|
|
+ default :
|
|
|
+ LOG.warn("Unknown config key: " + key);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -244,231 +214,175 @@ public class LogFeeder {
|
|
|
}
|
|
|
|
|
|
private void mergeAllConfigs() {
|
|
|
- globalMap = mergeConfigs(globalConfigList);
|
|
|
+ loadOutputs();
|
|
|
+ loadInputs();
|
|
|
+ loadFilters();
|
|
|
+
|
|
|
+ assignOutputsToInputs();
|
|
|
+ }
|
|
|
|
|
|
- sortBlocks(filterConfigList);
|
|
|
- // First loop for output
|
|
|
+ private void loadOutputs() {
|
|
|
for (Map<String, Object> map : outputConfigList) {
|
|
|
if (map == null) {
|
|
|
continue;
|
|
|
}
|
|
|
- mergeBlocks(globalMap, map);
|
|
|
+ mergeBlocks(globalConfigs, map);
|
|
|
|
|
|
String value = (String) map.get("destination");
|
|
|
- Output output;
|
|
|
- if (value == null || value.isEmpty()) {
|
|
|
- logger.error("Output block doesn't have destination element");
|
|
|
- continue;
|
|
|
- }
|
|
|
- String classFullName = AliasUtil.getInstance().readAlias(value, ALIAS_TYPE.OUTPUT, ALIAS_PARAM.KLASS);
|
|
|
- if (classFullName == null || classFullName.isEmpty()) {
|
|
|
- logger.error("Destination block doesn't have output element");
|
|
|
+ if (StringUtils.isEmpty(value)) {
|
|
|
+ LOG.error("Output block doesn't have destination element");
|
|
|
continue;
|
|
|
}
|
|
|
- output = (Output) LogFeederUtil.getClassInstance(classFullName, ALIAS_TYPE.OUTPUT);
|
|
|
-
|
|
|
+ Output output = (Output) AliasUtil.getClassInstance(value, AliasType.OUTPUT);
|
|
|
if (output == null) {
|
|
|
- logger.error("Destination Object is null");
|
|
|
+ LOG.error("Output object could not be found");
|
|
|
continue;
|
|
|
}
|
|
|
-
|
|
|
output.setDestination(value);
|
|
|
output.loadConfig(map);
|
|
|
|
|
|
- // We will only check for is_enabled out here. Down below we will
|
|
|
- // check whether this output is enabled for the input
|
|
|
- boolean isEnabled = output.getBooleanValue("is_enabled", true);
|
|
|
- if (isEnabled) {
|
|
|
- outputList.add(output);
|
|
|
+ // We will only check for is_enabled out here. Down below we will check whether this output is enabled for the input
|
|
|
+ if (output.getBooleanValue("is_enabled", true)) {
|
|
|
output.logConfgs(Level.INFO);
|
|
|
+ outputManager.add(output);
|
|
|
} else {
|
|
|
- logger.info("Output is disabled. So ignoring it. "
|
|
|
- + output.getShortDescription());
|
|
|
+ LOG.info("Output is disabled. So ignoring it. " + output.getShortDescription());
|
|
|
}
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- // Second loop for input
|
|
|
+ private void loadInputs() {
|
|
|
for (Map<String, Object> map : inputConfigList) {
|
|
|
if (map == null) {
|
|
|
continue;
|
|
|
}
|
|
|
- mergeBlocks(globalMap, map);
|
|
|
+ mergeBlocks(globalConfigs, map);
|
|
|
|
|
|
String value = (String) map.get("source");
|
|
|
- Input input;
|
|
|
- if (value == null || value.isEmpty()) {
|
|
|
- logger.error("Input block doesn't have source element");
|
|
|
- continue;
|
|
|
- }
|
|
|
- String classFullName = AliasUtil.getInstance().readAlias(value, ALIAS_TYPE.INPUT, ALIAS_PARAM.KLASS);
|
|
|
- if (classFullName == null || classFullName.isEmpty()) {
|
|
|
- logger.error("Source block doesn't have source element");
|
|
|
+ if (StringUtils.isEmpty(value)) {
|
|
|
+ LOG.error("Input block doesn't have source element");
|
|
|
continue;
|
|
|
}
|
|
|
- input = (Input) LogFeederUtil.getClassInstance(classFullName, ALIAS_TYPE.INPUT);
|
|
|
-
|
|
|
+ Input input = (Input) AliasUtil.getClassInstance(value, AliasType.INPUT);
|
|
|
if (input == null) {
|
|
|
- logger.error("Source Object is null");
|
|
|
+ LOG.error("Input object could not be found");
|
|
|
continue;
|
|
|
}
|
|
|
-
|
|
|
input.setType(value);
|
|
|
input.loadConfig(map);
|
|
|
|
|
|
if (input.isEnabled()) {
|
|
|
- input.setOutputMgr(outMgr);
|
|
|
- input.setInputMgr(inputMgr);
|
|
|
- inputMgr.add(input);
|
|
|
+ input.setOutputManager(outputManager);
|
|
|
+ input.setInputManager(inputManager);
|
|
|
+ inputManager.add(input);
|
|
|
input.logConfgs(Level.INFO);
|
|
|
} else {
|
|
|
- logger.info("Input is disabled. So ignoring it. "
|
|
|
- + input.getShortDescription());
|
|
|
+ LOG.info("Input is disabled. So ignoring it. " + input.getShortDescription());
|
|
|
}
|
|
|
}
|
|
|
+ }
|
|
|
+
|
|
|
+ private void loadFilters() {
|
|
|
+ sortFilters();
|
|
|
|
|
|
- // Third loop is for filter, but we will have to create a filter
|
|
|
- // instance for each input, so it can maintain the state per input
|
|
|
List<Input> toRemoveInputList = new ArrayList<Input>();
|
|
|
- for (Input input : inputMgr.getInputList()) {
|
|
|
- Filter prevFilter = null;
|
|
|
+ for (Input input : inputManager.getInputList()) {
|
|
|
for (Map<String, Object> map : filterConfigList) {
|
|
|
if (map == null) {
|
|
|
continue;
|
|
|
}
|
|
|
- mergeBlocks(globalMap, map);
|
|
|
+ mergeBlocks(globalConfigs, map);
|
|
|
|
|
|
String value = (String) map.get("filter");
|
|
|
- Filter filter;
|
|
|
- if (value == null || value.isEmpty()) {
|
|
|
- logger.error("Filter block doesn't have filter element");
|
|
|
- continue;
|
|
|
- }
|
|
|
-
|
|
|
- String classFullName = AliasUtil.getInstance().readAlias(value, ALIAS_TYPE.FILTER, ALIAS_PARAM.KLASS);
|
|
|
- if (classFullName == null || classFullName.isEmpty()) {
|
|
|
- logger.error("Filter block doesn't have filter element");
|
|
|
+ if (StringUtils.isEmpty(value)) {
|
|
|
+ LOG.error("Filter block doesn't have filter element");
|
|
|
continue;
|
|
|
}
|
|
|
- filter = (Filter) LogFeederUtil.getClassInstance(classFullName, ALIAS_TYPE.FILTER);
|
|
|
-
|
|
|
+ Filter filter = (Filter) AliasUtil.getClassInstance(value, AliasType.FILTER);
|
|
|
if (filter == null) {
|
|
|
- logger.error("Filter Object is null");
|
|
|
+ LOG.error("Filter object could not be found");
|
|
|
continue;
|
|
|
}
|
|
|
filter.loadConfig(map);
|
|
|
filter.setInput(input);
|
|
|
|
|
|
if (filter.isEnabled()) {
|
|
|
- filter.setOutputMgr(outMgr);
|
|
|
- if (prevFilter == null) {
|
|
|
- input.setFirstFilter(filter);
|
|
|
- } else {
|
|
|
- prevFilter.setNextFilter(filter);
|
|
|
- }
|
|
|
- prevFilter = filter;
|
|
|
+ filter.setOutputManager(outputManager);
|
|
|
+ input.addFilter(filter);
|
|
|
filter.logConfgs(Level.INFO);
|
|
|
} else {
|
|
|
- logger.debug("Ignoring filter "
|
|
|
- + filter.getShortDescription() + " for input "
|
|
|
- + input.getShortDescription());
|
|
|
+ LOG.debug("Ignoring filter " + filter.getShortDescription() + " for input " + input.getShortDescription());
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
if (input.getFirstFilter() == null) {
|
|
|
toRemoveInputList.add(input);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // Fourth loop is for associating valid outputs to input
|
|
|
- Set<Output> usedOutputSet = new HashSet<Output>();
|
|
|
- for (Input input : inputMgr.getInputList()) {
|
|
|
- for (Output output : outputList) {
|
|
|
- boolean ret = LogFeederUtil.isEnabled(output.getConfigs(),
|
|
|
- input.getConfigs());
|
|
|
- if (ret) {
|
|
|
- usedOutputSet.add(output);
|
|
|
- input.addOutput(output);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- outputList = usedOutputSet;
|
|
|
-
|
|
|
for (Input toRemoveInput : toRemoveInputList) {
|
|
|
- logger.warn("There are no filters, we will ignore this input. "
|
|
|
- + toRemoveInput.getShortDescription());
|
|
|
- inputMgr.removeInput(toRemoveInput);
|
|
|
+ LOG.warn("There are no filters, we will ignore this input. " + toRemoveInput.getShortDescription());
|
|
|
+ inputManager.removeInput(toRemoveInput);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void sortBlocks(List<Map<String, Object>> blockList) {
|
|
|
-
|
|
|
- Collections.sort(blockList, new Comparator<Map<String, Object>>() {
|
|
|
+ private void sortFilters() {
|
|
|
+ Collections.sort(filterConfigList, new Comparator<Map<String, Object>>() {
|
|
|
|
|
|
@Override
|
|
|
public int compare(Map<String, Object> o1, Map<String, Object> o2) {
|
|
|
Object o1Sort = o1.get("sort_order");
|
|
|
Object o2Sort = o2.get("sort_order");
|
|
|
- if (o1Sort == null) {
|
|
|
- return 0;
|
|
|
- }
|
|
|
- if (o2Sort == null) {
|
|
|
+ if (o1Sort == null || o2Sort == null) {
|
|
|
return 0;
|
|
|
}
|
|
|
- int o1Value = 0;
|
|
|
- if (!(o1Sort instanceof Number)) {
|
|
|
- try {
|
|
|
- o1Value = (new Double(Double.parseDouble(o1Sort
|
|
|
- .toString()))).intValue();
|
|
|
- } catch (Throwable t) {
|
|
|
- logger.error("Value is not of type Number. class="
|
|
|
- + o1Sort.getClass().getName() + ", value="
|
|
|
- + o1Sort.toString() + ", map=" + o1.toString());
|
|
|
- }
|
|
|
- } else {
|
|
|
- o1Value = ((Number) o1Sort).intValue();
|
|
|
- }
|
|
|
- int o2Value = 0;
|
|
|
- if (!(o2Sort instanceof Integer)) {
|
|
|
+
|
|
|
+ int o1Value = parseSort(o1, o1Sort);
|
|
|
+ int o2Value = parseSort(o2, o2Sort);
|
|
|
+
|
|
|
+ return o1Value - o2Value;
|
|
|
+ }
|
|
|
+
|
|
|
+ private int parseSort(Map<String, Object> map, Object o) {
|
|
|
+ if (!(o instanceof Number)) {
|
|
|
try {
|
|
|
- o2Value = (new Double(Double.parseDouble(o2Sort
|
|
|
- .toString()))).intValue();
|
|
|
+ return (new Double(Double.parseDouble(o.toString()))).intValue();
|
|
|
} catch (Throwable t) {
|
|
|
- logger.error("Value is not of type Number. class="
|
|
|
- + o2Sort.getClass().getName() + ", value="
|
|
|
- + o2Sort.toString() + ", map=" + o2.toString());
|
|
|
+ LOG.error("Value is not of type Number. class=" + o.getClass().getName() + ", value=" + o.toString()
|
|
|
+ + ", map=" + map.toString());
|
|
|
+ return 0;
|
|
|
}
|
|
|
} else {
|
|
|
-
|
|
|
+ return ((Number) o).intValue();
|
|
|
}
|
|
|
- return o1Value - o2Value;
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- private Map<String, Object> mergeConfigs(
|
|
|
- List<Map<String, Object>> configList) {
|
|
|
- Map<String, Object> mergedConfig = new HashMap<String, Object>();
|
|
|
- for (Map<String, Object> config : configList) {
|
|
|
- mergeBlocks(config, mergedConfig);
|
|
|
+ private void assignOutputsToInputs() {
|
|
|
+ Set<Output> usedOutputSet = new HashSet<Output>();
|
|
|
+ for (Input input : inputManager.getInputList()) {
|
|
|
+ for (Output output : outputManager.getOutputs()) {
|
|
|
+ if (LogFeederUtil.isEnabled(output.getConfigs(), input.getConfigs())) {
|
|
|
+ usedOutputSet.add(output);
|
|
|
+ input.addOutput(output);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
- return mergedConfig;
|
|
|
+ outputManager.retainUsedOutputs(usedOutputSet);
|
|
|
}
|
|
|
|
|
|
- private void mergeBlocks(Map<String, Object> fromMap,
|
|
|
- Map<String, Object> toMap) {
|
|
|
- // Merge the non-string
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ private void mergeBlocks(Map<String, Object> fromMap, Map<String, Object> toMap) {
|
|
|
for (String key : fromMap.keySet()) {
|
|
|
Object objValue = fromMap.get(key);
|
|
|
if (objValue == null) {
|
|
|
continue;
|
|
|
}
|
|
|
if (objValue instanceof Map) {
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- Map<String, Object> globalFields = LogFeederUtil
|
|
|
- .cloneObject((Map<String, Object>) fromMap.get(key));
|
|
|
+ Map<String, Object> globalFields = LogFeederUtil.cloneObject((Map<String, Object>) objValue);
|
|
|
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- Map<String, Object> localFields = (Map<String, Object>) toMap
|
|
|
- .get(key);
|
|
|
+ Map<String, Object> localFields = (Map<String, Object>) toMap.get(key);
|
|
|
if (localFields == null) {
|
|
|
localFields = new HashMap<String, Object>();
|
|
|
toMap.put(key, localFields);
|
|
@@ -477,8 +391,7 @@ public class LogFeeder {
|
|
|
if (globalFields != null) {
|
|
|
for (String fieldKey : globalFields.keySet()) {
|
|
|
if (!localFields.containsKey(fieldKey)) {
|
|
|
- localFields.put(fieldKey,
|
|
|
- globalFields.get(fieldKey));
|
|
|
+ localFields.put(fieldKey, globalFields.get(fieldKey));
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -493,11 +406,29 @@ public class LogFeeder {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private class JVMShutdownHook extends Thread {
|
|
|
+
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ LOG.info("Processing is shutting down.");
|
|
|
+
|
|
|
+ inputManager.close();
|
|
|
+ outputManager.close();
|
|
|
+ inputManager.checkInAll();
|
|
|
+
|
|
|
+ logStats();
|
|
|
+
|
|
|
+ LOG.info("LogSearch is exiting.");
|
|
|
+ } catch (Throwable t) {
|
|
|
+ // Ignore
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private void monitor() throws Exception {
|
|
|
- inputMgr.monitor();
|
|
|
+ inputManager.monitor();
|
|
|
JVMShutdownHook logfeederJVMHook = new JVMShutdownHook();
|
|
|
- ShutdownHookManager.get().addShutdownHook(logfeederJVMHook,
|
|
|
- LOGFEEDER_SHUTDOWN_HOOK_PRIORITY);
|
|
|
+ ShutdownHookManager.get().addShutdownHook(logfeederJVMHook, LOGFEEDER_SHUTDOWN_HOOK_PRIORITY);
|
|
|
|
|
|
statLoggerThread = new Thread("statLogger") {
|
|
|
|
|
@@ -512,17 +443,14 @@ public class LogFeeder {
|
|
|
try {
|
|
|
logStats();
|
|
|
} catch (Throwable t) {
|
|
|
- logger.error(
|
|
|
- "LogStats: Caught exception while logging stats.",
|
|
|
- t);
|
|
|
+ LOG.error("LogStats: Caught exception while logging stats.", t);
|
|
|
}
|
|
|
|
|
|
- if (System.currentTimeMillis() > (lastCheckPointCleanedMS + checkPointCleanIntervalMS)) {
|
|
|
+ if (System.currentTimeMillis() > (lastCheckPointCleanedMS + CHECKPOINT_CLEAN_INTERVAL_MS)) {
|
|
|
lastCheckPointCleanedMS = System.currentTimeMillis();
|
|
|
- inputMgr.cleanCheckPointFiles();
|
|
|
+ inputManager.cleanCheckPointFiles();
|
|
|
}
|
|
|
|
|
|
- // logfeeder is stopped then break the loop
|
|
|
if (isLogfeederCompleted) {
|
|
|
break;
|
|
|
}
|
|
@@ -536,84 +464,20 @@ public class LogFeeder {
|
|
|
}
|
|
|
|
|
|
private void logStats() {
|
|
|
- inputMgr.logStats();
|
|
|
- outMgr.logStats();
|
|
|
-
|
|
|
- if (metricsMgr.isMetricsEnabled()) {
|
|
|
- List<MetricCount> metricsList = new ArrayList<MetricCount>();
|
|
|
- inputMgr.addMetricsContainers(metricsList);
|
|
|
- outMgr.addMetricsContainers(metricsList);
|
|
|
- metricsMgr.useMetrics(metricsList);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private String readFile(BufferedReader br) throws Exception {
|
|
|
- try {
|
|
|
- StringBuilder sb = new StringBuilder();
|
|
|
- String line = br.readLine();
|
|
|
- while (line != null) {
|
|
|
- sb.append(line);
|
|
|
- line = br.readLine();
|
|
|
- }
|
|
|
- return sb.toString();
|
|
|
- } catch (Exception t) {
|
|
|
- logger.error("Error loading properties file.", t);
|
|
|
- throw t;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public Collection<Output> getOutputList() {
|
|
|
- return outputList;
|
|
|
- }
|
|
|
-
|
|
|
- public OutputMgr getOutMgr() {
|
|
|
- return outMgr;
|
|
|
- }
|
|
|
-
|
|
|
- public static void main(String[] args) {
|
|
|
- LogFeeder logFeeder = new LogFeeder(args);
|
|
|
- logFeeder.run();
|
|
|
- }
|
|
|
-
|
|
|
- public void run() {
|
|
|
- try {
|
|
|
- Date startTime = new Date();
|
|
|
- this.init();
|
|
|
- Date endTime = new Date();
|
|
|
- logger.info("Took " + (endTime.getTime() - startTime.getTime())
|
|
|
- + " ms to initialize");
|
|
|
- this.monitor();
|
|
|
- //wait for all background thread before stop main thread
|
|
|
- this.waitOnAllDaemonThreads();
|
|
|
- } catch (Throwable t) {
|
|
|
- logger.fatal("Caught exception in main.", t);
|
|
|
- System.exit(1);
|
|
|
+ inputManager.logStats();
|
|
|
+ outputManager.logStats();
|
|
|
+
|
|
|
+ if (metricsManager.isMetricsEnabled()) {
|
|
|
+ List<MetricData> metricsList = new ArrayList<MetricData>();
|
|
|
+ inputManager.addMetricsContainers(metricsList);
|
|
|
+ outputManager.addMetricsContainers(metricsList);
|
|
|
+ metricsManager.useMetrics(metricsList);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private class JVMShutdownHook extends Thread {
|
|
|
-
|
|
|
- public void run() {
|
|
|
- try {
|
|
|
- logger.info("Processing is shutting down.");
|
|
|
-
|
|
|
- inputMgr.close();
|
|
|
- outMgr.close();
|
|
|
- inputMgr.checkInAll();
|
|
|
-
|
|
|
- logStats();
|
|
|
-
|
|
|
- logger.info("LogSearch is exiting.");
|
|
|
- } catch (Throwable t) {
|
|
|
- // Ignore
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
private void waitOnAllDaemonThreads() {
|
|
|
- String foreground = LogFeederUtil.getStringProperty("foreground");
|
|
|
- if (foreground != null && foreground.equalsIgnoreCase("true")) {
|
|
|
- inputMgr.waitOnAllInputs();
|
|
|
+ if ("true".equals(LogFeederUtil.getStringProperty("foreground"))) {
|
|
|
+ inputManager.waitOnAllInputs();
|
|
|
isLogfeederCompleted = true;
|
|
|
if (statLoggerThread != null) {
|
|
|
try {
|
|
@@ -624,24 +488,16 @@ public class LogFeeder {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- private String[] getConfigFromCmdLine() {
|
|
|
- String inputConfigDir = LogFeederUtil.getStringProperty("input_config_dir");
|
|
|
- if (inputConfigDir != null && !inputConfigDir.isEmpty()) {
|
|
|
- String[] searchFileWithExtensions = new String[] { "json" };
|
|
|
- File configDirFile = new File(inputConfigDir);
|
|
|
- List<File> configFiles = FileUtil.getAllFileFromDir(configDirFile,
|
|
|
- searchFileWithExtensions, false);
|
|
|
- if (configFiles != null && configFiles.size() > 0) {
|
|
|
- String configPaths[] = new String[configFiles.size()];
|
|
|
- for (int index = 0; index < configFiles.size(); index++) {
|
|
|
- File configFile = configFiles.get(index);
|
|
|
- String configFilePath = configFile.getAbsolutePath();
|
|
|
- configPaths[index] = configFilePath;
|
|
|
- }
|
|
|
- return configPaths;
|
|
|
- }
|
|
|
+
|
|
|
+ public static void main(String[] args) {
|
|
|
+ try {
|
|
|
+ LogFeederUtil.loadProperties("logfeeder.properties", args);
|
|
|
+ } catch (Throwable t) {
|
|
|
+ LOG.warn("Could not load logfeeder properites");
|
|
|
+ System.exit(1);
|
|
|
}
|
|
|
- return new String[0];
|
|
|
+
|
|
|
+ LogFeeder logFeeder = new LogFeeder();
|
|
|
+ logFeeder.run();
|
|
|
}
|
|
|
}
|