|
@@ -18,83 +18,95 @@
|
|
*/
|
|
*/
|
|
package org.apache.ambari.logfeeder.output;
|
|
package org.apache.ambari.logfeeder.output;
|
|
|
|
|
|
-import java.io.File;
|
|
|
|
-import java.util.ArrayList;
|
|
|
|
-import java.util.Date;
|
|
|
|
-import java.util.HashMap;
|
|
|
|
-import java.util.List;
|
|
|
|
-import java.util.Map;
|
|
|
|
-import java.util.Map.Entry;
|
|
|
|
-
|
|
|
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
|
+import com.google.gson.Gson;
|
|
|
|
+import com.google.gson.GsonBuilder;
|
|
import org.apache.ambari.logfeeder.LogFeeder;
|
|
import org.apache.ambari.logfeeder.LogFeeder;
|
|
import org.apache.ambari.logfeeder.LogFeederUtil;
|
|
import org.apache.ambari.logfeeder.LogFeederUtil;
|
|
import org.apache.ambari.logfeeder.filter.Filter;
|
|
import org.apache.ambari.logfeeder.filter.Filter;
|
|
import org.apache.ambari.logfeeder.input.InputMarker;
|
|
import org.apache.ambari.logfeeder.input.InputMarker;
|
|
|
|
+import org.apache.ambari.logfeeder.output.spool.LogSpooler;
|
|
|
|
+import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext;
|
|
|
|
+import org.apache.ambari.logfeeder.output.spool.RolloverCondition;
|
|
|
|
+import org.apache.ambari.logfeeder.output.spool.RolloverHandler;
|
|
import org.apache.ambari.logfeeder.s3.S3Util;
|
|
import org.apache.ambari.logfeeder.s3.S3Util;
|
|
-import org.apache.ambari.logfeeder.util.CompressionUtil;
|
|
|
|
-import org.apache.ambari.logfeeder.util.PlaceholderUtil;
|
|
|
|
|
|
+import org.apache.log4j.Logger;
|
|
|
|
+
|
|
|
|
+import java.io.File;
|
|
|
|
+import java.util.*;
|
|
|
|
+import java.util.Map.Entry;
|
|
|
|
|
|
-import com.google.gson.Gson;
|
|
|
|
-import com.google.gson.GsonBuilder;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Write log file into s3 bucket
|
|
|
|
|
|
+ * Write log file into s3 bucket.
|
|
|
|
+ *
|
|
|
|
+ * This class supports two modes of upload:
|
|
|
|
+ * <ul>
|
|
|
|
+ * <li>A one time upload of files matching a pattern</li>
|
|
|
|
+ * <li>A batch mode, asynchronous, periodic upload of files</li>
|
|
|
|
+ * </ul>
|
|
*/
|
|
*/
|
|
-public class OutputS3File extends Output {
|
|
|
|
|
|
+public class OutputS3File extends Output implements RolloverCondition, RolloverHandler {
|
|
|
|
+
|
|
|
|
+ public static final String INPUT_ATTRIBUTE_TYPE = "type";
|
|
|
|
+ public static final String GLOBAL_CONFIG_S3_PATH_SUFFIX = "global.config.json";
|
|
|
|
+ static private Logger logger = Logger.getLogger(OutputS3File.class);
|
|
|
|
+
|
|
|
|
+ private LogSpooler logSpooler;
|
|
|
|
+ private S3OutputConfiguration s3OutputConfiguration;
|
|
|
|
+ private S3Uploader s3Uploader;
|
|
|
|
|
|
|
|
+ @Override
|
|
|
|
+ public void init() throws Exception {
|
|
|
|
+ super.init();
|
|
|
|
+ s3OutputConfiguration = S3OutputConfiguration.fromConfigBlock(this);
|
|
|
|
+ }
|
|
|
|
|
|
private static boolean uploadedGlobalConfig = false;
|
|
private static boolean uploadedGlobalConfig = false;
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Copy local log files and corresponding config to S3 bucket one time.
|
|
|
|
+ * @param inputFile The file to be copied
|
|
|
|
+ * @param inputMarker Contains information about the configuration to be uploaded.
|
|
|
|
+ */
|
|
@Override
|
|
@Override
|
|
public void copyFile(File inputFile, InputMarker inputMarker) {
|
|
public void copyFile(File inputFile, InputMarker inputMarker) {
|
|
- String bucketName = getStringValue("s3_bucket");
|
|
|
|
- String s3LogDir = getStringValue("s3_log_dir");
|
|
|
|
- HashMap<String, String> contextParam = buildContextParam();
|
|
|
|
- s3LogDir = PlaceholderUtil.replaceVariables(s3LogDir, contextParam);
|
|
|
|
- String s3AccessKey = getStringValue("s3_access_key");
|
|
|
|
- String s3SecretKey = getStringValue("s3_secret_key");
|
|
|
|
- String compressionAlgo = getStringValue("compression_algo");
|
|
|
|
- String fileName = inputFile.getName();
|
|
|
|
- // create tmp compressed File
|
|
|
|
- String tmpDir = LogFeederUtil.getLogfeederTempDir();
|
|
|
|
- File outputFile = new File(tmpDir + fileName + "_"
|
|
|
|
- + new Date().getTime() + "." + compressionAlgo);
|
|
|
|
- outputFile = CompressionUtil.compressFile(inputFile, outputFile,
|
|
|
|
- compressionAlgo);
|
|
|
|
- String type = inputMarker.input.getStringValue("type");
|
|
|
|
- String s3Path = s3LogDir + S3Util.INSTANCE.S3_PATH_SEPARATOR + type
|
|
|
|
- + S3Util.INSTANCE.S3_PATH_SEPARATOR + fileName + "."
|
|
|
|
- + compressionAlgo;
|
|
|
|
- S3Util.INSTANCE.uploadFileTos3(bucketName, s3Path, outputFile, s3AccessKey,
|
|
|
|
- s3SecretKey);
|
|
|
|
- // delete local compressed file
|
|
|
|
- outputFile.deleteOnExit();
|
|
|
|
- ArrayList<Map<String, Object>> filters = new ArrayList<Map<String, Object>>();
|
|
|
|
|
|
+ String type = inputMarker.input.getStringValue(INPUT_ATTRIBUTE_TYPE);
|
|
|
|
+ S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration,
|
|
|
|
+ S3Util.INSTANCE, false, type);
|
|
|
|
+ String resolvedPath = s3Uploader.uploadFile(inputFile,
|
|
|
|
+ inputMarker.input.getStringValue(INPUT_ATTRIBUTE_TYPE));
|
|
|
|
+
|
|
|
|
+ uploadConfig(inputMarker, type, s3OutputConfiguration, resolvedPath);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void uploadConfig(InputMarker inputMarker, String type,
|
|
|
|
+ S3OutputConfiguration s3OutputConfiguration, String resolvedPath) {
|
|
|
|
+
|
|
|
|
+ ArrayList<Map<String, Object>> filters = new ArrayList<>();
|
|
addFilters(filters, inputMarker.input.getFirstFilter());
|
|
addFilters(filters, inputMarker.input.getFirstFilter());
|
|
- Map<String, Object> inputConfig = new HashMap<String, Object>();
|
|
|
|
|
|
+ Map<String, Object> inputConfig = new HashMap<>();
|
|
inputConfig.putAll(inputMarker.input.getConfigs());
|
|
inputConfig.putAll(inputMarker.input.getConfigs());
|
|
- String s3CompletePath = S3Util.INSTANCE.S3_PATH_START_WITH + bucketName
|
|
|
|
- + S3Util.INSTANCE.S3_PATH_SEPARATOR + s3Path;
|
|
|
|
|
|
+ String s3CompletePath = S3Util.S3_PATH_START_WITH + s3OutputConfiguration.getS3BucketName()
|
|
|
|
+ + S3Util.S3_PATH_SEPARATOR + resolvedPath;
|
|
inputConfig.put("path", s3CompletePath);
|
|
inputConfig.put("path", s3CompletePath);
|
|
|
|
|
|
- ArrayList<Map<String, Object>> inputConfigList = new ArrayList<Map<String, Object>>();
|
|
|
|
|
|
+ ArrayList<Map<String, Object>> inputConfigList = new ArrayList<>();
|
|
inputConfigList.add(inputConfig);
|
|
inputConfigList.add(inputConfig);
|
|
// set source s3_file
|
|
// set source s3_file
|
|
// remove global config from filter config
|
|
// remove global config from filter config
|
|
removeGlobalConfig(inputConfigList);
|
|
removeGlobalConfig(inputConfigList);
|
|
removeGlobalConfig(filters);
|
|
removeGlobalConfig(filters);
|
|
// write config into s3 file
|
|
// write config into s3 file
|
|
- String s3Key = getComponentConfigFileName(type);
|
|
|
|
- Map<String, Object> config = new HashMap<String, Object>();
|
|
|
|
|
|
+ Map<String, Object> config = new HashMap<>();
|
|
config.put("filter", filters);
|
|
config.put("filter", filters);
|
|
config.put("input", inputConfigList);
|
|
config.put("input", inputConfigList);
|
|
- writeConfigToS3(config, bucketName, s3AccessKey, s3SecretKey, contextParam,
|
|
|
|
- s3Key);
|
|
|
|
|
|
+ writeConfigToS3(config, getComponentConfigFileName(type), s3OutputConfiguration);
|
|
// write global config
|
|
// write global config
|
|
- writeGlobalConfig();
|
|
|
|
|
|
+ writeGlobalConfig(s3OutputConfiguration);
|
|
}
|
|
}
|
|
|
|
|
|
- public void addFilters(ArrayList<Map<String, Object>> filters, Filter filter) {
|
|
|
|
|
|
+ private void addFilters(ArrayList<Map<String, Object>> filters, Filter filter) {
|
|
if (filter != null) {
|
|
if (filter != null) {
|
|
Map<String, Object> filterConfig = new HashMap<String, Object>();
|
|
Map<String, Object> filterConfig = new HashMap<String, Object>();
|
|
filterConfig.putAll(filter.getConfigs());
|
|
filterConfig.putAll(filter.getConfigs());
|
|
@@ -105,38 +117,28 @@ public class OutputS3File extends Output {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- public void writeConfigToS3(Map<String, Object> config, String bucketName,
|
|
|
|
- String accessKey, String secretKey, HashMap<String, String> contextParam,
|
|
|
|
- String s3Key) {
|
|
|
|
- String s3ConfigDir = getStringValue("s3_config_dir");
|
|
|
|
- s3ConfigDir = PlaceholderUtil.replaceVariables(s3ConfigDir, contextParam);
|
|
|
|
|
|
+ private void writeConfigToS3(Map<String, Object> configToWrite, String s3KeySuffix,
|
|
|
|
+ S3OutputConfiguration s3OutputConfiguration) {
|
|
Gson gson = new GsonBuilder().setPrettyPrinting().create();
|
|
Gson gson = new GsonBuilder().setPrettyPrinting().create();
|
|
- String configJson = gson.toJson(config);
|
|
|
|
|
|
+ String configJson = gson.toJson(configToWrite);
|
|
|
|
|
|
- s3Key = s3ConfigDir + S3Util.INSTANCE.S3_PATH_SEPARATOR + s3Key;
|
|
|
|
- S3Util.INSTANCE.writeIntoS3File(configJson, bucketName, s3Key, accessKey,
|
|
|
|
- secretKey);
|
|
|
|
- }
|
|
|
|
|
|
+ String s3ResolvedKey = new S3LogPathResolver().
|
|
|
|
+ getResolvedPath(getStringValue("s3_config_dir"), s3KeySuffix, s3OutputConfiguration.getCluster());
|
|
|
|
|
|
- public String getComponentConfigFileName(String componentName) {
|
|
|
|
- String fileName = "input.config-" + componentName + ".json";
|
|
|
|
- return fileName;
|
|
|
|
|
|
+ S3Util.INSTANCE.writeIntoS3File(configJson, s3OutputConfiguration.getS3BucketName(),
|
|
|
|
+ s3ResolvedKey, s3OutputConfiguration.getS3AccessKey(),
|
|
|
|
+ s3OutputConfiguration.getS3SecretKey());
|
|
}
|
|
}
|
|
|
|
|
|
- public HashMap<String, String> buildContextParam() {
|
|
|
|
- HashMap<String, String> contextParam = new HashMap<String, String>();
|
|
|
|
- contextParam.put("host", LogFeederUtil.hostName);
|
|
|
|
- contextParam.put("ip", LogFeederUtil.ipAddress);
|
|
|
|
- String cluster = getNVList("add_fields").get("cluster");
|
|
|
|
- contextParam.put("cluster", cluster);
|
|
|
|
- return contextParam;
|
|
|
|
|
|
+ private String getComponentConfigFileName(String componentName) {
|
|
|
|
+ return "input.config-" + componentName + ".json";
|
|
}
|
|
}
|
|
|
|
|
|
-
|
|
|
|
|
|
+
|
|
private Map<String, Object> getGlobalConfig() {
|
|
private Map<String, Object> getGlobalConfig() {
|
|
Map<String, Object> globalConfig = LogFeeder.globalMap;
|
|
Map<String, Object> globalConfig = LogFeeder.globalMap;
|
|
if (globalConfig == null) {
|
|
if (globalConfig == null) {
|
|
- globalConfig = new HashMap<String, Object>();
|
|
|
|
|
|
+ globalConfig = new HashMap<>();
|
|
}
|
|
}
|
|
return globalConfig;
|
|
return globalConfig;
|
|
}
|
|
}
|
|
@@ -163,7 +165,7 @@ public class OutputS3File extends Output {
|
|
* write global config in s3 file Invoke only once
|
|
* write global config in s3 file Invoke only once
|
|
*/
|
|
*/
|
|
@SuppressWarnings("unchecked")
|
|
@SuppressWarnings("unchecked")
|
|
- private synchronized void writeGlobalConfig() {
|
|
|
|
|
|
+ private synchronized void writeGlobalConfig(S3OutputConfiguration s3OutputConfiguration) {
|
|
if (!uploadedGlobalConfig) {
|
|
if (!uploadedGlobalConfig) {
|
|
Map<String, Object> globalConfig = LogFeederUtil.cloneObject(getGlobalConfig());
|
|
Map<String, Object> globalConfig = LogFeederUtil.cloneObject(getGlobalConfig());
|
|
//updating global config before write to s3
|
|
//updating global config before write to s3
|
|
@@ -174,7 +176,7 @@ public class OutputS3File extends Output {
|
|
Map<String, Object> addFields = (Map<String, Object>) globalConfig
|
|
Map<String, Object> addFields = (Map<String, Object>) globalConfig
|
|
.get("add_fields");
|
|
.get("add_fields");
|
|
if (addFields == null) {
|
|
if (addFields == null) {
|
|
- addFields = new HashMap<String, Object>();
|
|
|
|
|
|
+ addFields = new HashMap<>();
|
|
}
|
|
}
|
|
addFields.put("ip", LogFeederUtil.ipAddress);
|
|
addFields.put("ip", LogFeederUtil.ipAddress);
|
|
addFields.put("host", LogFeederUtil.hostName);
|
|
addFields.put("host", LogFeederUtil.hostName);
|
|
@@ -189,20 +191,85 @@ public class OutputS3File extends Output {
|
|
globalConfig.put("add_fields", addFields);
|
|
globalConfig.put("add_fields", addFields);
|
|
Map<String, Object> config = new HashMap<String, Object>();
|
|
Map<String, Object> config = new HashMap<String, Object>();
|
|
config.put("global", globalConfig);
|
|
config.put("global", globalConfig);
|
|
- String s3AccessKey = getStringValue("s3_access_key");
|
|
|
|
- String s3SecretKey = getStringValue("s3_secret_key");
|
|
|
|
- String bucketName = getStringValue("s3_bucket");
|
|
|
|
- String s3Key = "global.config.json";
|
|
|
|
- HashMap<String, String> contextParam = buildContextParam();
|
|
|
|
- writeConfigToS3(config, bucketName, s3AccessKey, s3SecretKey,
|
|
|
|
- contextParam, s3Key);
|
|
|
|
|
|
+ writeConfigToS3(config, GLOBAL_CONFIG_S3_PATH_SUFFIX, s3OutputConfiguration);
|
|
uploadedGlobalConfig = true;
|
|
uploadedGlobalConfig = true;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Write a log line to local file, to upload to S3 bucket asynchronously.
|
|
|
|
+ *
|
|
|
|
+ * This method uses a {@link LogSpooler} to spool the log lines to a local file.
|
|
|
|
+
|
|
|
|
+ * @param block The log event to upload
|
|
|
|
+ * @param inputMarker Contains information about the log file feeding the lines.
|
|
|
|
+ * @throws Exception
|
|
|
|
+ */
|
|
@Override
|
|
@Override
|
|
public void write(String block, InputMarker inputMarker) throws Exception {
|
|
public void write(String block, InputMarker inputMarker) throws Exception {
|
|
- throw new UnsupportedOperationException(
|
|
|
|
- "write method is not yet supported for output=s3_file");
|
|
|
|
|
|
+ if (logSpooler == null) {
|
|
|
|
+ logSpooler = createSpooler(inputMarker.input.getFilePath());
|
|
|
|
+ s3Uploader = createUploader(inputMarker.input.getStringValue(INPUT_ATTRIBUTE_TYPE));
|
|
|
|
+ }
|
|
|
|
+ logSpooler.add(block);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ protected S3Uploader createUploader(String logType) {
|
|
|
|
+ S3Uploader uploader = new S3Uploader(s3OutputConfiguration, S3Util.INSTANCE, true, logType);
|
|
|
|
+ uploader.startUploaderThread();
|
|
|
|
+ return uploader;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ protected LogSpooler createSpooler(String filePath) {
|
|
|
|
+ String spoolDirectory = LogFeederUtil.getLogfeederTempDir() + "/s3/service";
|
|
|
|
+ logger.info(String.format("Creating spooler with spoolDirectory=%s, filePath=%s",
|
|
|
|
+ spoolDirectory, filePath));
|
|
|
|
+ return new LogSpooler(spoolDirectory, new File(filePath).getName()+"-", this, this,
|
|
|
|
+ s3OutputConfiguration.getRolloverTimeThresholdSecs());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Check whether the locally spooled file should be rolled over, based on file size.
|
|
|
|
+ *
|
|
|
|
+ * @param currentSpoolerContext {@link LogSpoolerContext} that holds state about the file being checked
|
|
|
|
+ * for rollover.
|
|
|
|
+ * @return true if sufficient size has been reached based on {@link S3OutputConfiguration#getRolloverSizeThresholdBytes()},
|
|
|
|
+ * false otherwise
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public boolean shouldRollover(LogSpoolerContext currentSpoolerContext) {
|
|
|
|
+ File spoolFile = currentSpoolerContext.getActiveSpoolFile();
|
|
|
|
+ long currentSize = spoolFile.length();
|
|
|
|
+ boolean result = (currentSize >= s3OutputConfiguration.getRolloverSizeThresholdBytes());
|
|
|
|
+ if (result) {
|
|
|
|
+ logger.info(String.format("Rolling over %s, current size %d, threshold size %d", spoolFile, currentSize,
|
|
|
|
+ s3OutputConfiguration.getRolloverSizeThresholdBytes()));
|
|
|
|
+ }
|
|
|
|
+ return result;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Stops dependent objects that consume resources.
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public void close() {
|
|
|
|
+ if (s3Uploader != null) {
|
|
|
|
+ s3Uploader.stopUploaderThread();
|
|
|
|
+ }
|
|
|
|
+ if (logSpooler != null) {
|
|
|
|
+ logSpooler.close();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Adds the locally spooled file to the {@link S3Uploader} to be uploaded asynchronously.
|
|
|
|
+ *
|
|
|
|
+ * @param rolloverFile The file that has been rolled over.
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public void handleRollover(File rolloverFile) {
|
|
|
|
+ s3Uploader.addFileForUpload(rolloverFile.getAbsolutePath());
|
|
}
|
|
}
|
|
}
|
|
}
|