Переглянути джерело

AMBARI-17834. Logfeeder: Removed file that is processed by hdfsCopyThread from readylist (Hayat Behlim via oleewere)

oleewere 9 роки тому
батько
коміт
d65072df10

+ 37 - 16
ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java

@@ -35,6 +35,7 @@ import org.apache.ambari.logfeeder.input.InputMarker;
 import org.apache.ambari.logfeeder.util.DateUtil;
 import org.apache.ambari.logfeeder.util.LogfeederHDFSUtil;
 import org.apache.ambari.logfeeder.util.PlaceholderUtil;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.Logger;
 
@@ -71,9 +72,19 @@ public class OutputHDFSFile extends Output {
     hdfsPort = getStringValue("hdfs_port");
     localFileRolloverSec = getLongValue("rollover_sec", localFileRolloverSec);
     filenamePrefix = getStringValue("file_name_prefix", filenamePrefix);
-    if (hdfsOutDir == null || hdfsOutDir.isEmpty()) {
+    if (StringUtils.isEmpty(hdfsOutDir)) {
       logger
-          .error("Filepath config property <path> is not set in config file.");
+          .error("HDFS config property <hdfs_out_dir> is not set in config file.");
+      return;
+    }
+    if (StringUtils.isEmpty(hdfsHost)) {
+      logger
+          .error("HDFS config property <hdfs_host> is not set in config file.");
+      return;
+    }
+    if (StringUtils.isEmpty(hdfsPort)) {
+      logger
+          .error("HDFS config property <hdfs_port> is not set in config file.");
       return;
     }
     HashMap<String, String> contextParam = buildContextParam();
@@ -93,7 +104,7 @@ public class OutputHDFSFile extends Output {
         outWriter.close();
         addFileInReadyList(localcurrentFile);
       } catch (Throwable t) {
-        // Ignore this exception
+        logger.error(t.getLocalizedMessage(),t);
       }
     }
     this.stopHDFSCopyThread();
@@ -147,7 +158,7 @@ public class OutputHDFSFile extends Output {
     }
   }
 
-  public synchronized void buildOutWriter() {
+  private synchronized void buildOutWriter() {
     if (outWriter == null) {
       String currentFilePath = localFilePath + getCurrentFileName();
       localcurrentFile = new File(currentFilePath);
@@ -201,19 +212,18 @@ public class OutputHDFSFile extends Output {
                   logger.error("Hdfs file copy  failed for hdfspath :"
                       + destFilePath + " and localpath :" + localPath);
                 }
-
               }
-
+              localFileIterator.remove();
             }
             try {
               // wait till new file comes in reayList
               synchronized (readyMonitor) {
-                if (localReadyFiles.size() == 0) {
+                if (localReadyFiles.isEmpty()) {
                   readyMonitor.wait();
                 }
               }
             } catch (InterruptedException e) {
-              // ignore
+              logger.error(e.getLocalizedMessage(),e);
             }
           }
         } catch (Exception e) {
@@ -231,36 +241,47 @@ public class OutputHDFSFile extends Output {
   private void stopHDFSCopyThread() {
     if (hdfsCopyThread != null) {
       logger.info("waiting till copy all local files to hdfs.......");
-      while (localReadyFiles.size() != 0) {
-
+      while (!localReadyFiles.isEmpty()) {
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          logger.error(e.getLocalizedMessage(), e);
+        }
+        logger.debug("still waiting to copy all local files to hdfs.......");
       }
       logger.info("calling interrupt method for hdfsCopyThread to stop it.");
-      hdfsCopyThread.interrupt();
+      try {
+        hdfsCopyThread.interrupt();
+      } catch (SecurityException exception) {
+        logger.error(" Current thread : '" + Thread.currentThread().getName()
+            + "' does not have permission to interrupt the Thread: '"
+            + hdfsCopyThread.getName() + "'");
+      }
       LogfeederHDFSUtil.INSTANCE.closeFileSystem(fileSystem);
     }
   }
 
-  public String getCurrentFileName() {
+  private String getCurrentFileName() {
     Date currentDate = new Date();
     String dateStr = DateUtil.dateToString(currentDate, fileDateFormat);
     String fileName = filenamePrefix + dateStr;
     return fileName;
   }
 
-  public HashMap<String, String> buildContextParam() {
+  private HashMap<String, String> buildContextParam() {
     HashMap<String, String> contextParam = new HashMap<String, String>();
     contextParam.put("host", LogFeederUtil.hostName);
     return contextParam;
   }
 
-  public void addFileInReadyList(File localFile) {
+  private void addFileInReadyList(File localFile) {
     localReadyFiles.add(localFile);
     try {
       synchronized (readyMonitor) {
         readyMonitor.notifyAll();
       }
-    } catch (Exception exception) {
-      // ignore
+    } catch (Exception e) {
+      logger.error(e.getLocalizedMessage(),e);
     }
   }