|
@@ -24,14 +24,7 @@ import java.io.File;
|
|
|
import java.io.FileFilter;
|
|
|
import java.io.IOException;
|
|
|
import java.io.RandomAccessFile;
|
|
|
-import java.nio.file.FileSystems;
|
|
|
-import java.nio.file.Path;
|
|
|
-import java.nio.file.Paths;
|
|
|
-import java.nio.file.WatchEvent;
|
|
|
-import java.nio.file.WatchKey;
|
|
|
-import java.nio.file.WatchService;
|
|
|
import java.util.ArrayList;
|
|
|
-import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.List;
|
|
@@ -39,8 +32,6 @@ import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
import java.util.UUID;
|
|
|
|
|
|
-import static java.nio.file.StandardWatchEventKinds.*;
|
|
|
-
|
|
|
import org.apache.ambari.logfeeder.input.Input;
|
|
|
import org.apache.ambari.logfeeder.input.InputFile;
|
|
|
import org.apache.commons.io.filefilter.WildcardFileFilter;
|
|
@@ -48,21 +39,18 @@ import org.apache.log4j.Logger;
|
|
|
import org.apache.solr.common.util.Base64;
|
|
|
|
|
|
public class InputMgr {
|
|
|
- static Logger logger = Logger.getLogger(InputMgr.class);
|
|
|
+ private static final Logger logger = Logger.getLogger(InputMgr.class);
|
|
|
|
|
|
- List<Input> inputList = new ArrayList<Input>();
|
|
|
- Set<Input> notReadyList = new HashSet<Input>();
|
|
|
+ private List<Input> inputList = new ArrayList<Input>();
|
|
|
+ private Set<Input> notReadyList = new HashSet<Input>();
|
|
|
|
|
|
- WatchService folderWatcher = null;
|
|
|
- Set<File> foldersToMonitor = new HashSet<File>();
|
|
|
- Map<String, Input> filesToMonitor = new HashMap<String, Input>();
|
|
|
- boolean isDrain = false;
|
|
|
- boolean isAnyInputTail = false;
|
|
|
+ private boolean isDrain = false;
|
|
|
+ private boolean isAnyInputTail = false;
|
|
|
|
|
|
private String checkPointSubFolderName = "logfeeder_checkpoints";
|
|
|
- File checkPointFolderFile = null;
|
|
|
+ private File checkPointFolderFile = null;
|
|
|
|
|
|
- MetricCount filesCountMetric = new MetricCount();
|
|
|
+ private MetricCount filesCountMetric = new MetricCount();
|
|
|
|
|
|
private String checkPointExtension = ".cp";
|
|
|
|
|
@@ -172,7 +160,7 @@ public class InputMgr {
|
|
|
return checkPointFolderFile;
|
|
|
}
|
|
|
|
|
|
- boolean verifyCheckPointFolder(File folderPathFile) {
|
|
|
+ private boolean verifyCheckPointFolder(File folderPathFile) {
|
|
|
if (!folderPathFile.exists()) {
|
|
|
// Create the folder
|
|
|
try {
|
|
@@ -434,98 +422,6 @@ public class InputMgr {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- synchronized public void monitorSystemFileChanges(Input inputToMonitor) {
|
|
|
- try {
|
|
|
- File fileToMonitor = new File(inputToMonitor.getFilePath());
|
|
|
- if (filesToMonitor.containsKey(fileToMonitor.getAbsolutePath())) {
|
|
|
- logger.info("Already monitoring file " + fileToMonitor
|
|
|
- + ". So ignoring this request");
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- // make a new watch service that we can register interest in
|
|
|
- // directories and files with.
|
|
|
- if (folderWatcher == null) {
|
|
|
- folderWatcher = FileSystems.getDefault().newWatchService();
|
|
|
- // start the file watcher thread below
|
|
|
- Thread th = new Thread(new FileSystemMonitor(),
|
|
|
- "FileSystemWatcher");
|
|
|
- th.setDaemon(true);
|
|
|
- th.start();
|
|
|
-
|
|
|
- }
|
|
|
- File folderToWatch = fileToMonitor.getParentFile();
|
|
|
- if (folderToWatch != null) {
|
|
|
- if (foldersToMonitor.contains(folderToWatch.getAbsolutePath())) {
|
|
|
- logger.info("Already monitoring folder " + folderToWatch
|
|
|
- + ". So ignoring this request.");
|
|
|
- } else {
|
|
|
- logger.info("Configuring to monitor folder "
|
|
|
- + folderToWatch + " for file " + fileToMonitor);
|
|
|
- // get the directory we want to watch, using the Paths singleton class
|
|
|
- Path toWatch = Paths.get(folderToWatch.getAbsolutePath());
|
|
|
- if (toWatch == null) {
|
|
|
- throw new UnsupportedOperationException(
|
|
|
- "Directory not found. folder=" + folderToWatch);
|
|
|
- }
|
|
|
-
|
|
|
- toWatch.register(folderWatcher, ENTRY_CREATE);
|
|
|
- foldersToMonitor.add(folderToWatch);
|
|
|
- }
|
|
|
- filesToMonitor.put(fileToMonitor.getAbsolutePath(),
|
|
|
- inputToMonitor);
|
|
|
- } else {
|
|
|
- logger.error("File doesn't have parent folder." + fileToMonitor);
|
|
|
- }
|
|
|
- } catch (IOException e) {
|
|
|
- logger.error("Error while trying to set watcher for file:"
|
|
|
- + inputToMonitor);
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- class FileSystemMonitor implements Runnable {
|
|
|
- @Override
|
|
|
- public void run() {
|
|
|
- try {
|
|
|
- // get the first event before looping
|
|
|
- WatchKey key = folderWatcher.take();
|
|
|
- while (key != null) {
|
|
|
- Path dir = (Path) key.watchable();
|
|
|
- // we have a polled event, now we traverse it and
|
|
|
- // receive all the states from it
|
|
|
- for (WatchEvent<?> event : key.pollEvents()) {
|
|
|
- if (!event.kind().equals(ENTRY_CREATE)) {
|
|
|
- logger.info("Ignoring event.kind=" + event.kind());
|
|
|
- continue;
|
|
|
- }
|
|
|
- logger.info("Received " + event.kind()
|
|
|
- + " event for file " + event.context());
|
|
|
-
|
|
|
- File newFile = new File(dir.toFile(), event.context()
|
|
|
- .toString());
|
|
|
- Input rolledOverInput = filesToMonitor.get(newFile
|
|
|
- .getAbsolutePath());
|
|
|
- if (rolledOverInput == null) {
|
|
|
- logger.info("Input not found for file " + newFile);
|
|
|
- } else {
|
|
|
- rolledOverInput.rollOver();
|
|
|
- }
|
|
|
- }
|
|
|
- if (!key.reset()) {
|
|
|
- logger.error("Error while key.reset(). Will have to abort watching files. Rollover will not work.");
|
|
|
- break;
|
|
|
- }
|
|
|
- key = folderWatcher.take();
|
|
|
- }
|
|
|
- } catch (InterruptedException e) {
|
|
|
- logger.info("Stop request for thread");
|
|
|
- }
|
|
|
- logger.info("Exiting FileSystemMonitor thread.");
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
public void waitOnAllInputs() {
|
|
|
//wait on inputs
|
|
|
if (inputList != null) {
|