Browse Source

MAPREDUCE-7047. Make HAR tool support IndexedLogAggregtionController. (Xuan Gong via wangda)

Change-Id: Ice5ae1c706f2476361997dcbb29f2c33c77d4f0c
Wangda Tan 7 năm trước cách đây
mục cha
commit
f47659fb97

+ 161 - 53
hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java

@@ -48,7 +48,8 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
 import java.io.File;
@@ -102,11 +103,14 @@ public class HadoopArchiveLogs implements Tool {
   @VisibleForTesting
   Set<AppInfo> eligibleApplications;
 
+  private Set<Path> workingDirs;
+
   private JobConf conf;
 
   public HadoopArchiveLogs(Configuration conf) {
     setConf(conf);
     eligibleApplications = new HashSet<>();
+    workingDirs = new HashSet<>();
   }
 
   public static void main(String[] args) {
@@ -138,47 +142,71 @@ public class HadoopArchiveLogs implements Tool {
     handleOpts(args);
 
     FileSystem fs = null;
-    Path remoteRootLogDir = new Path(conf.get(
-        YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
-        YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
-    String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
-    Path workingDir = new Path(remoteRootLogDir, "archive-logs-work");
-    if (verbose) {
-      LOG.info("Remote Log Dir Root: " + remoteRootLogDir);
-      LOG.info("Log Suffix: " + suffix);
-      LOG.info("Working Dir: " + workingDir);
+
+    LogAggregationFileControllerFactory factory =
+        new LogAggregationFileControllerFactory(conf);
+    List<LogAggregationFileController> fileControllers = factory
+        .getConfiguredLogAggregationFileControllerList();
+    if (fileControllers == null || fileControllers.isEmpty()) {
+      LOG.info("Can not find any valid fileControllers.");
+      if (verbose) {
+        LOG.info("The configurated fileControllers:"
+            + YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS);
+      }
+      return 0;
     }
     try {
       fs = FileSystem.get(conf);
-      if (prepareWorkingDir(fs, workingDir)) {
-
-        checkFilesAndSeedApps(fs, remoteRootLogDir, suffix);
+      // find eligibleApplications for all the fileControllers
+      int previousTotal = 0;
+      for (LogAggregationFileController fileController : fileControllers) {
+        Path remoteRootLogDir = fileController.getRemoteRootLogDir();
+        String suffix = fileController.getRemoteRootLogDirSuffix();
+        Path workingDir = new Path(remoteRootLogDir, "archive-logs-work");
+        if (verbose) {
+          LOG.info("LogAggregationFileController:" + fileController
+              .getClass().getName());
+          LOG.info("Remote Log Dir Root: " + remoteRootLogDir);
+          LOG.info("Log Suffix: " + suffix);
+          LOG.info("Working Dir: " + workingDir);
+        }
+        checkFilesAndSeedApps(fs, remoteRootLogDir, suffix, workingDir);
 
         filterAppsByAggregatedStatus();
 
-        checkMaxEligible();
-
-        if (eligibleApplications.isEmpty()) {
-          LOG.info("No eligible applications to process");
-          exitCode = 0;
-        } else {
-          StringBuilder sb =
-              new StringBuilder("Will process the following applications:");
-          for (AppInfo app : eligibleApplications) {
-            sb.append("\n\t").append(app.getAppId());
-          }
-          LOG.info(sb.toString());
-
-          File localScript = File.createTempFile("hadoop-archive-logs-", ".sh");
-          generateScript(localScript, workingDir, remoteRootLogDir, suffix);
-
-          exitCode = runDistributedShell(localScript) ? 0 : 1;
+        if (eligibleApplications.size() > previousTotal) {
+          workingDirs.add(workingDir);
+          previousTotal = eligibleApplications.size();
+        }
+      }
+      checkMaxEligible();
+      if (workingDirs.isEmpty() || eligibleApplications.isEmpty()) {
+        LOG.info("No eligible applications to process");
+        return 0;
+      }
+      for (Path workingDir : workingDirs) {
+        if (!prepareWorkingDir(fs, workingDir)) {
+          LOG.error("Failed to create the workingDir:"
+              + workingDir.toString());
+          return 1;
         }
       }
+      StringBuilder sb =
+          new StringBuilder("Will process the following applications:");
+      for (AppInfo app : eligibleApplications) {
+        sb.append("\n\t").append(app.getAppId());
+      }
+      LOG.info(sb.toString());
+      File localScript = File.createTempFile("hadoop-archive-logs-", ".sh");
+      generateScript(localScript);
+
+      exitCode = runDistributedShell(localScript) ? 0 : 1;
     } finally {
       if (fs != null) {
         // Cleanup working directory
-        fs.delete(workingDir, true);
+        for (Path workingDir : workingDirs) {
+          fs.delete(workingDir, true);
+        }
         fs.close();
       }
     }
@@ -296,8 +324,8 @@ public class HadoopArchiveLogs implements Tool {
     try {
       client.init(getConf());
       client.start();
-      for (Iterator<AppInfo> it = eligibleApplications.iterator();
-           it.hasNext();) {
+      for (Iterator<AppInfo> it = eligibleApplications
+          .iterator(); it.hasNext();) {
         AppInfo app = it.next();
         try {
           ApplicationReport report = client.getApplicationReport(
@@ -335,7 +363,7 @@ public class HadoopArchiveLogs implements Tool {
 
   @VisibleForTesting
   void checkFilesAndSeedApps(FileSystem fs, Path remoteRootLogDir,
-       String suffix) throws IOException {
+       String suffix, Path workingDir) throws IOException {
     for (RemoteIterator<FileStatus> userIt =
          fs.listStatusIterator(remoteRootLogDir); userIt.hasNext();) {
       Path userLogPath = userIt.next().getPath();
@@ -375,8 +403,13 @@ public class HadoopArchiveLogs implements Tool {
                   LOG.info("Adding " + appLogPath.getName() + " for user " +
                       userLogPath.getName());
                 }
-                eligibleApplications.add(
-                    new AppInfo(appLogPath.getName(), userLogPath.getName()));
+                AppInfo context = new AppInfo();
+                context.setAppId(appLogPath.getName());
+                context.setUser(userLogPath.getName());
+                context.setSuffix(suffix);
+                context.setRemoteRootLogDir(remoteRootLogDir);
+                context.setWorkingDir(workingDir);
+                eligibleApplications.add(context);
               }
             } else {
               if (verbose) {
@@ -406,14 +439,17 @@ public class HadoopArchiveLogs implements Tool {
   @VisibleForTesting
   void checkMaxEligible() {
     // If we have too many eligible apps, remove the newest ones first
-    if (maxEligible > 0 && eligibleApplications.size() > maxEligible) {
+    if (maxEligible > 0 && eligibleApplications.size()
+        > maxEligible) {
       if (verbose) {
-        LOG.info("Too many applications (" + eligibleApplications.size() +
+        LOG.info("Too many applications (" + eligibleApplications
+            .size() +
             " > " + maxEligible + ")");
       }
       List<AppInfo> sortedApplications =
           new ArrayList<AppInfo>(eligibleApplications);
-      Collections.sort(sortedApplications, new Comparator<AppInfo>() {
+      Collections.sort(sortedApplications, new Comparator<
+          AppInfo>() {
         @Override
         public int compare(AppInfo o1, AppInfo o2) {
           int lCompare = Long.compare(o1.getFinishTime(), o2.getFinishTime());
@@ -440,20 +476,25 @@ public class HadoopArchiveLogs implements Tool {
   if [ "$YARN_SHELL_ID" == "1" ]; then
         appId="application_1440448768987_0001"
         user="rkanter"
+        workingDir="/tmp/logs/archive-logs-work"
+        remoteRootLogDir="/tmp/logs"
+        suffix="logs"
   elif [ "$YARN_SHELL_ID" == "2" ]; then
         appId="application_1440448768987_0002"
         user="rkanter"
+        workingDir="/tmp/logs/archive-logs-work"
+        remoteRootLogDir="/tmp/logs"
+        suffix="logs"
   else
         echo "Unknown Mapping!"
         exit 1
   fi
   export HADOOP_CLIENT_OPTS="-Xmx1024m"
   export HADOOP_CLASSPATH=/dist/share/hadoop/tools/lib/hadoop-archive-logs-2.8.0-SNAPSHOT.jar:/dist/share/hadoop/tools/lib/hadoop-archives-2.8.0-SNAPSHOT.jar
-  "$HADOOP_HOME"/bin/hadoop org.apache.hadoop.tools.HadoopArchiveLogsRunner -appId "$appId" -user "$user" -workingDir /tmp/logs/archive-logs-work -remoteRootLogDir /tmp/logs -suffix logs
+  "$HADOOP_HOME"/bin/hadoop org.apache.hadoop.tools.HadoopArchiveLogsRunner -appId "$appId" -user "$user" -workingDir "$workingDir" -remoteRootLogDir "$remoteRootLogDir" -suffix "$suffix"
    */
   @VisibleForTesting
-  void generateScript(File localScript, Path workingDir,
-        Path remoteRootLogDir, String suffix) throws IOException {
+  void generateScript(File localScript) throws IOException {
     if (verbose) {
       LOG.info("Generating script at: " + localScript.getAbsolutePath());
     }
@@ -467,13 +508,19 @@ public class HadoopArchiveLogs implements Tool {
       fw = new FileWriterWithEncoding(localScript, "UTF-8");
       fw.write("#!/bin/bash\nset -e\nset -x\n");
       int containerCount = 1;
-      for (AppInfo app : eligibleApplications) {
+      for (AppInfo context : eligibleApplications) {
         fw.write("if [ \"$YARN_SHELL_ID\" == \"");
         fw.write(Integer.toString(containerCount));
         fw.write("\" ]; then\n\tappId=\"");
-        fw.write(app.getAppId());
+        fw.write(context.getAppId());
         fw.write("\"\n\tuser=\"");
-        fw.write(app.getUser());
+        fw.write(context.getUser());
+        fw.write("\"\n\tworkingDir=\"");
+        fw.write(context.getWorkingDir().toString());
+        fw.write("\"\n\tremoteRootLogDir=\"");
+        fw.write(context.getRemoteRootLogDir().toString());
+        fw.write("\"\n\tsuffix=\"");
+        fw.write(context.getSuffix());
         fw.write("\"\nel");
         containerCount++;
       }
@@ -486,11 +533,11 @@ public class HadoopArchiveLogs implements Tool {
       fw.write("\n\"$HADOOP_HOME\"/bin/hadoop ");
       fw.write(HadoopArchiveLogsRunner.class.getName());
       fw.write(" -appId \"$appId\" -user \"$user\" -workingDir ");
-      fw.write(workingDir.toString());
+      fw.write("\"$workingDir\"");
       fw.write(" -remoteRootLogDir ");
-      fw.write(remoteRootLogDir.toString());
+      fw.write("\"$remoteRootLogDir\"");
       fw.write(" -suffix ");
-      fw.write(suffix);
+      fw.write("\"$suffix\"");
       if (!proxy) {
         fw.write(" -noProxy\n");
       }
@@ -542,23 +589,59 @@ public class HadoopArchiveLogs implements Tool {
   @VisibleForTesting
   static class AppInfo {
     private String appId;
+    private Path remoteRootLogDir;
+    private String suffix;
+    private Path workingDir;
     private String user;
     private long finishTime;
 
+    AppInfo() {}
+
     AppInfo(String appId, String user) {
-      this.appId = appId;
-      this.user = user;
-      this.finishTime = 0L;
+      this.setAppId(appId);
+      this.setUser(user);
     }
 
     public String getAppId() {
       return appId;
     }
 
+    public void setAppId(String appId) {
+      this.appId = appId;
+    }
+
+    public Path getRemoteRootLogDir() {
+      return remoteRootLogDir;
+    }
+
+    public void setRemoteRootLogDir(Path remoteRootLogDir) {
+      this.remoteRootLogDir = remoteRootLogDir;
+    }
+
+    public String getSuffix() {
+      return suffix;
+    }
+
+    public void setSuffix(String suffix) {
+      this.suffix = suffix;
+    }
+
+    public Path getWorkingDir() {
+      return workingDir;
+    }
+
+    public void setWorkingDir(Path workingDir) {
+      this.workingDir = workingDir;
+    }
+
     public String getUser() {
       return user;
     }
 
+    public void setUser(String user) {
+      this.user = user;
+    }
+
     public long getFinishTime() {
       return finishTime;
     }
@@ -582,14 +665,39 @@ public class HadoopArchiveLogs implements Tool {
           ? !appId.equals(appInfo.appId) : appInfo.appId != null) {
         return false;
       }
-      return !(user != null
-          ? !user.equals(appInfo.user) : appInfo.user != null);
+
+      if (user != null
+          ? !user.equals(appInfo.user) : appInfo.user != null) {
+        return false;
+      }
+
+      if (suffix != null
+          ? !suffix.equals(appInfo.suffix) : appInfo.suffix != null) {
+        return false;
+      }
+
+      if (workingDir != null ? !workingDir.equals(
+          appInfo.workingDir) : appInfo.workingDir != null) {
+        return false;
+      }
+
+      if (remoteRootLogDir != null ? !remoteRootLogDir.equals(
+          appInfo.remoteRootLogDir) : appInfo.remoteRootLogDir != null) {
+        return false;
+      }
+
+      return Long.compare(finishTime, appInfo.finishTime) == 0;
     }
 
     @Override
     public int hashCode() {
       int result = appId != null ? appId.hashCode() : 0;
       result = 31 * result + (user != null ? user.hashCode() : 0);
+      result = 31 * result + (suffix != null ? suffix.hashCode() : 0);
+      result = 31 * result + (workingDir != null ? workingDir.hashCode() : 0);
+      result = 31 * result + (remoteRootLogDir != null ?
+          remoteRootLogDir.hashCode() : 0);
+      result = 31 * result + Long.valueOf(finishTime).hashCode();
       return result;
     }
   }

+ 56 - 26
hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java

@@ -95,7 +95,8 @@ public class TestHadoopArchiveLogs {
     createFile(fs, new Path(app5Path, "file2"), 3);
 
     Assert.assertEquals(0, hal.eligibleApplications.size());
-    hal.checkFilesAndSeedApps(fs, rootLogDir, suffix);
+    hal.checkFilesAndSeedApps(fs, rootLogDir, suffix, new Path(rootLogDir,
+        "archive-logs-work"));
     Assert.assertEquals(1, hal.eligibleApplications.size());
     Assert.assertEquals(appId5.toString(),
         hal.eligibleApplications.iterator().next().getAppId());
@@ -249,59 +250,88 @@ public class TestHadoopArchiveLogs {
   private void _testGenerateScript(boolean proxy) throws Exception {
     Configuration conf = new Configuration();
     HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
+    Path workingDir = new Path("/tmp", "working");
+    Path remoteRootLogDir = new Path("/tmp", "logs");
+    String suffix = "logs";
     ApplicationId app1 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 1);
+    HadoopArchiveLogs.AppInfo appInfo1 = new HadoopArchiveLogs.AppInfo(
+        app1.toString(), USER);
+    appInfo1.setSuffix(suffix);
+    appInfo1.setRemoteRootLogDir(remoteRootLogDir);
+    appInfo1.setWorkingDir(workingDir);
+    hal.eligibleApplications.add(appInfo1);
     ApplicationId app2 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 2);
-    hal.eligibleApplications.add(new HadoopArchiveLogs.AppInfo(app1.toString(),
-        USER));
-    hal.eligibleApplications.add(new HadoopArchiveLogs.AppInfo(app2.toString(),
-        USER));
+    Path workingDir2 = new Path("/tmp", "working2");
+    Path remoteRootLogDir2 = new Path("/tmp", "logs2");
+    String suffix2 = "logs2";
+    HadoopArchiveLogs.AppInfo appInfo2 = new HadoopArchiveLogs.AppInfo(
+        app2.toString(), USER);
+    appInfo2.setSuffix(suffix2);
+    appInfo2.setRemoteRootLogDir(remoteRootLogDir2);
+    appInfo2.setWorkingDir(workingDir2);
+    hal.eligibleApplications.add(appInfo2);
     hal.proxy = proxy;
 
     File localScript = new File("target", "script.sh");
-    Path workingDir = new Path("/tmp", "working");
-    Path remoteRootLogDir = new Path("/tmp", "logs");
-    String suffix = "logs";
     localScript.delete();
     Assert.assertFalse(localScript.exists());
-    hal.generateScript(localScript, workingDir, remoteRootLogDir, suffix);
+    hal.generateScript(localScript);
     Assert.assertTrue(localScript.exists());
     String script = IOUtils.toString(localScript.toURI());
     String[] lines = script.split(System.lineSeparator());
-    Assert.assertEquals(16, lines.length);
+    Assert.assertEquals(22, lines.length);
     Assert.assertEquals("#!/bin/bash", lines[0]);
     Assert.assertEquals("set -e", lines[1]);
     Assert.assertEquals("set -x", lines[2]);
     Assert.assertEquals("if [ \"$YARN_SHELL_ID\" == \"1\" ]; then", lines[3]);
+    boolean oneBefore = true;
     if (lines[4].contains(app1.toString())) {
       Assert.assertEquals("\tappId=\"" + app1.toString() + "\"", lines[4]);
-      Assert.assertEquals("\tappId=\"" + app2.toString() + "\"", lines[7]);
+      Assert.assertEquals("\tappId=\"" + app2.toString() + "\"", lines[10]);
     } else {
+      oneBefore = false;
       Assert.assertEquals("\tappId=\"" + app2.toString() + "\"", lines[4]);
-      Assert.assertEquals("\tappId=\"" + app1.toString() + "\"", lines[7]);
+      Assert.assertEquals("\tappId=\"" + app1.toString() + "\"", lines[10]);
     }
     Assert.assertEquals("\tuser=\"" + USER + "\"", lines[5]);
-    Assert.assertEquals("elif [ \"$YARN_SHELL_ID\" == \"2\" ]; then", lines[6]);
-    Assert.assertEquals("\tuser=\"" + USER + "\"", lines[8]);
-    Assert.assertEquals("else", lines[9]);
-    Assert.assertEquals("\techo \"Unknown Mapping!\"", lines[10]);
-    Assert.assertEquals("\texit 1", lines[11]);
-    Assert.assertEquals("fi", lines[12]);
-    Assert.assertEquals("export HADOOP_CLIENT_OPTS=\"-Xmx1024m\"", lines[13]);
-    Assert.assertTrue(lines[14].startsWith("export HADOOP_CLASSPATH="));
+    Assert.assertEquals("\tworkingDir=\"" + (oneBefore ? workingDir.toString()
+        : workingDir2.toString()) + "\"", lines[6]);
+    Assert.assertEquals("\tremoteRootLogDir=\"" + (oneBefore
+        ? remoteRootLogDir.toString() : remoteRootLogDir2.toString())
+        + "\"", lines[7]);
+    Assert.assertEquals("\tsuffix=\"" + (oneBefore ? suffix : suffix2)
+        + "\"", lines[8]);
+    Assert.assertEquals("elif [ \"$YARN_SHELL_ID\" == \"2\" ]; then",
+        lines[9]);
+    Assert.assertEquals("\tuser=\"" + USER + "\"", lines[11]);
+    Assert.assertEquals("\tworkingDir=\"" + (oneBefore
+        ? workingDir2.toString() : workingDir.toString()) + "\"",
+        lines[12]);
+    Assert.assertEquals("\tremoteRootLogDir=\"" + (oneBefore
+        ? remoteRootLogDir2.toString() : remoteRootLogDir.toString())
+        + "\"", lines[13]);
+    Assert.assertEquals("\tsuffix=\"" + (oneBefore ? suffix2 : suffix)
+        + "\"", lines[14]);
+    Assert.assertEquals("else", lines[15]);
+    Assert.assertEquals("\techo \"Unknown Mapping!\"", lines[16]);
+    Assert.assertEquals("\texit 1", lines[17]);
+    Assert.assertEquals("fi", lines[18]);
+    Assert.assertEquals("export HADOOP_CLIENT_OPTS=\"-Xmx1024m\"", lines[19]);
+    Assert.assertTrue(lines[20].startsWith("export HADOOP_CLASSPATH="));
     if (proxy) {
       Assert.assertEquals(
           "\"$HADOOP_HOME\"/bin/hadoop org.apache.hadoop.tools." +
               "HadoopArchiveLogsRunner -appId \"$appId\" -user \"$user\" " +
-              "-workingDir " + workingDir.toString() + " -remoteRootLogDir " +
-              remoteRootLogDir.toString() + " -suffix " + suffix,
-          lines[15]);
+              "-workingDir \"$workingDir\" -remoteRootLogDir " +
+              "\"$remoteRootLogDir\" -suffix \"$suffix\"",
+          lines[21]);
     } else {
       Assert.assertEquals(
           "\"$HADOOP_HOME\"/bin/hadoop org.apache.hadoop.tools." +
               "HadoopArchiveLogsRunner -appId \"$appId\" -user \"$user\" " +
-              "-workingDir " + workingDir.toString() + " -remoteRootLogDir " +
-              remoteRootLogDir.toString() + " -suffix " + suffix + " -noProxy",
-          lines[15]);
+              "-workingDir \"$workingDir\" -remoteRootLogDir " +
+              "\"$remoteRootLogDir\" -suffix \"$suffix\" -noProxy",
+          lines[21]);
     }
   }