|
@@ -19,28 +19,32 @@
|
|
|
|
|
|
package org.apache.ambari.logfeeder.output;
|
|
|
|
|
|
-import java.io.BufferedWriter;
|
|
|
-import java.io.File;
|
|
|
-import java.io.FileNotFoundException;
|
|
|
-import java.io.FileWriter;
|
|
|
-import java.io.IOException;
|
|
|
-import java.io.PrintWriter;
|
|
|
-import java.util.Date;
|
|
|
-import java.util.HashMap;
|
|
|
-import java.util.Iterator;
|
|
|
-import java.util.concurrent.ConcurrentLinkedQueue;
|
|
|
-
|
|
|
import org.apache.ambari.logfeeder.LogFeederUtil;
|
|
|
import org.apache.ambari.logfeeder.input.InputMarker;
|
|
|
-import org.apache.ambari.logfeeder.util.DateUtil;
|
|
|
+import org.apache.ambari.logfeeder.output.spool.LogSpooler;
|
|
|
+import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext;
|
|
|
+import org.apache.ambari.logfeeder.output.spool.RolloverCondition;
|
|
|
+import org.apache.ambari.logfeeder.output.spool.RolloverHandler;
|
|
|
import org.apache.ambari.logfeeder.util.LogfeederHDFSUtil;
|
|
|
import org.apache.ambari.logfeeder.util.PlaceholderUtil;
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.log4j.Logger;
|
|
|
|
|
|
-public class OutputHDFSFile extends Output {
|
|
|
+import java.io.File;
|
|
|
+import java.util.Date;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.Iterator;
|
|
|
+import java.util.concurrent.ConcurrentLinkedQueue;
|
|
|
+
|
|
|
+/**
|
|
|
+ * An {@link Output} that records logs to HDFS.
|
|
|
+ *
|
|
|
+ * The events are spooled on the local file system and uploaded in batches asynchronously.
|
|
|
+ */
|
|
|
+public class OutputHDFSFile extends Output implements RolloverHandler, RolloverCondition {
|
|
|
private final static Logger logger = Logger.getLogger(OutputHDFSFile.class);
|
|
|
+ private static final long DEFAULT_ROLLOVER_THRESHOLD_TIME_SECONDS = 5 * 60L;// 5 min by default
|
|
|
|
|
|
private ConcurrentLinkedQueue<File> localReadyFiles = new ConcurrentLinkedQueue<File>();
|
|
|
|
|
@@ -48,21 +52,15 @@ public class OutputHDFSFile extends Output {
|
|
|
|
|
|
private Thread hdfsCopyThread = null;
|
|
|
|
|
|
- private PrintWriter outWriter = null;
|
|
|
- // local writer variables
|
|
|
- private String localFilePath = null;
|
|
|
private String filenamePrefix = "service-logs-";
|
|
|
- private String localFileDir = null;
|
|
|
- private File localcurrentFile = null;
|
|
|
- private Date localFileCreateTime = null;
|
|
|
- private long localFileRolloverSec = 5 * 1 * 60;// 5 min by default
|
|
|
+ private long rolloverThresholdTimeMillis;
|
|
|
|
|
|
private String hdfsOutDir = null;
|
|
|
private String hdfsHost = null;
|
|
|
private String hdfsPort = null;
|
|
|
private FileSystem fileSystem = null;
|
|
|
|
|
|
- private String fileDateFormat = "yyyy-MM-dd-HH-mm-ss";
|
|
|
+ private LogSpooler logSpooler;
|
|
|
|
|
|
@Override
|
|
|
public void init() throws Exception {
|
|
@@ -70,7 +68,8 @@ public class OutputHDFSFile extends Output {
|
|
|
hdfsOutDir = getStringValue("hdfs_out_dir");
|
|
|
hdfsHost = getStringValue("hdfs_host");
|
|
|
hdfsPort = getStringValue("hdfs_port");
|
|
|
- localFileRolloverSec = getLongValue("rollover_sec", localFileRolloverSec);
|
|
|
+ long rolloverThresholdTimeSeconds = getLongValue("rollover_sec", DEFAULT_ROLLOVER_THRESHOLD_TIME_SECONDS);
|
|
|
+ rolloverThresholdTimeMillis = rolloverThresholdTimeSeconds * 1000L;
|
|
|
filenamePrefix = getStringValue("file_name_prefix", filenamePrefix);
|
|
|
if (StringUtils.isEmpty(hdfsOutDir)) {
|
|
|
logger
|
|
@@ -90,23 +89,15 @@ public class OutputHDFSFile extends Output {
|
|
|
HashMap<String, String> contextParam = buildContextParam();
|
|
|
hdfsOutDir = PlaceholderUtil.replaceVariables(hdfsOutDir, contextParam);
|
|
|
logger.info("hdfs Output dir=" + hdfsOutDir);
|
|
|
- localFileDir = LogFeederUtil.getLogfeederTempDir() + "hdfs/service/";
|
|
|
- localFilePath = localFileDir;
|
|
|
+ String localFileDir = LogFeederUtil.getLogfeederTempDir() + "hdfs/service/";
|
|
|
+ logSpooler = new LogSpooler(localFileDir, filenamePrefix, this, this);
|
|
|
this.startHDFSCopyThread();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void close() {
|
|
|
logger.info("Closing file." + getShortDescription());
|
|
|
- if (outWriter != null) {
|
|
|
- try {
|
|
|
- outWriter.flush();
|
|
|
- outWriter.close();
|
|
|
- addFileInReadyList(localcurrentFile);
|
|
|
- } catch (Throwable t) {
|
|
|
- logger.error(t.getLocalizedMessage(),t);
|
|
|
- }
|
|
|
- }
|
|
|
+ logSpooler.rollover();
|
|
|
this.stopHDFSCopyThread();
|
|
|
isClosed = true;
|
|
|
}
|
|
@@ -115,12 +106,8 @@ public class OutputHDFSFile extends Output {
|
|
|
synchronized public void write(String block, InputMarker inputMarker)
|
|
|
throws Exception {
|
|
|
if (block != null) {
|
|
|
- buildOutWriter();
|
|
|
- if (outWriter != null) {
|
|
|
- statMetric.count++;
|
|
|
- outWriter.println(block);
|
|
|
- closeFileIfNeeded();
|
|
|
- }
|
|
|
+ logSpooler.add(block);
|
|
|
+ statMetric.count++;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -130,59 +117,6 @@ public class OutputHDFSFile extends Output {
|
|
|
return "output:destination=hdfs,hdfsOutDir=" + hdfsOutDir;
|
|
|
}
|
|
|
|
|
|
- private synchronized void closeFileIfNeeded() throws FileNotFoundException,
|
|
|
- IOException {
|
|
|
- if (outWriter == null) {
|
|
|
- return;
|
|
|
- }
|
|
|
- // TODO: Close the file on absolute time. Currently it is implemented as
|
|
|
- // relative time
|
|
|
- if (System.currentTimeMillis() - localFileCreateTime.getTime() > localFileRolloverSec * 1000) {
|
|
|
- logger.info("Closing file. Rolling over. name="
|
|
|
- + localcurrentFile.getName() + ", filePath="
|
|
|
- + localcurrentFile.getAbsolutePath());
|
|
|
- try {
|
|
|
- outWriter.flush();
|
|
|
- outWriter.close();
|
|
|
- addFileInReadyList(localcurrentFile);
|
|
|
- } catch (Throwable t) {
|
|
|
- logger
|
|
|
- .error("Error on closing output writter. Exception will be ignored. name="
|
|
|
- + localcurrentFile.getName()
|
|
|
- + ", filePath="
|
|
|
- + localcurrentFile.getAbsolutePath());
|
|
|
- }
|
|
|
-
|
|
|
- outWriter = null;
|
|
|
- localcurrentFile = null;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private synchronized void buildOutWriter() {
|
|
|
- if (outWriter == null) {
|
|
|
- String currentFilePath = localFilePath + getCurrentFileName();
|
|
|
- localcurrentFile = new File(currentFilePath);
|
|
|
- if (localcurrentFile.getParentFile() != null) {
|
|
|
- File parentDir = localcurrentFile.getParentFile();
|
|
|
- if (!parentDir.isDirectory()) {
|
|
|
- parentDir.mkdirs();
|
|
|
- }
|
|
|
- }
|
|
|
- try {
|
|
|
- outWriter = new PrintWriter(new BufferedWriter(new FileWriter(
|
|
|
- localcurrentFile, true)));
|
|
|
- } catch (IOException e) {
|
|
|
- logger.error("= OutputHDFSFile.buidOutWriter failed for file : "
|
|
|
- + localcurrentFile.getAbsolutePath() + " Desc: "
|
|
|
- + getShortDescription() + " errorMsg: " + e.getLocalizedMessage(),
|
|
|
- e);
|
|
|
- }
|
|
|
- localFileCreateTime = new Date();
|
|
|
- logger.info("Create file is successful. localFilePath="
|
|
|
- + localcurrentFile.getAbsolutePath());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
private void startHDFSCopyThread() {
|
|
|
|
|
|
hdfsCopyThread = new Thread("hdfsCopyThread") {
|
|
@@ -261,13 +195,6 @@ public class OutputHDFSFile extends Output {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private String getCurrentFileName() {
|
|
|
- Date currentDate = new Date();
|
|
|
- String dateStr = DateUtil.dateToString(currentDate, fileDateFormat);
|
|
|
- String fileName = filenamePrefix + dateStr;
|
|
|
- return fileName;
|
|
|
- }
|
|
|
-
|
|
|
private HashMap<String, String> buildContextParam() {
|
|
|
HashMap<String, String> contextParam = new HashMap<String, String>();
|
|
|
contextParam.put("host", LogFeederUtil.hostName);
|
|
@@ -291,4 +218,33 @@ public class OutputHDFSFile extends Output {
|
|
|
throw new UnsupportedOperationException(
|
|
|
"copyFile method is not yet supported for output=hdfs");
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Add the rollover file to a daemon thread for uploading to HDFS
|
|
|
+ * @param rolloverFile the file to be uploaded to HDFS
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public void handleRollover(File rolloverFile) {
|
|
|
+ addFileInReadyList(rolloverFile);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Determines whether it is time to handleRollover the current spool file.
|
|
|
+ *
|
|
|
+ * The file will handleRollover if the time since creation of the file is more than
|
|
|
+ * the timeout specified in rollover_sec configuration.
|
|
|
+ * @param currentSpoolerContext {@link LogSpoolerContext} that holds state of active Spool file
|
|
|
+ * @return true if time since creation is greater than value specified in rollover_sec,
|
|
|
+ * false otherwise.
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public boolean shouldRollover(LogSpoolerContext currentSpoolerContext) {
|
|
|
+ long timeSinceCreation = new Date().getTime() - currentSpoolerContext.getActiveLogCreationTime().getTime();
|
|
|
+ boolean shouldRollover = timeSinceCreation > rolloverThresholdTimeMillis;
|
|
|
+ if (shouldRollover) {
|
|
|
+ logger.info("Detecting that time since file creation time " + currentSpoolerContext.getActiveLogCreationTime() +
|
|
|
+ " has crossed threshold (msecs) " + rolloverThresholdTimeMillis);
|
|
|
+ }
|
|
|
+ return shouldRollover;
|
|
|
+ }
|
|
|
}
|