Sfoglia il codice sorgente

YARN-1670. aggregated log writer can write more log data then it says is the log length (Mit Desai via jeagles)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1581009 13f79535-47bb-0310-9956-ffa450edef68
Jonathan Turner Eagles 11 anni fa
parent
commit
fc82850a0b

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -30,6 +30,9 @@ Release 0.23.11 - UNRELEASED
     YARN-500. Fixed YARN webapps to not roll-over ports when explicitly asked
     to use non-ephemeral ports. (Kenji Kikushima via vinodkv)
 
+    YARN-1670. aggregated log writer can write more log data then it says is
+    the log length (Mit Desai via jeagles)
+
 Release 0.23.10 - 2013-12-09
 
   INCOMPATIBLE CHANGES

+ 20 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java

@@ -169,11 +169,13 @@ public class AggregatedLogFormat {
         Arrays.sort(logFiles);
         for (File logFile : logFiles) {
 
+          final long fileLength = logFile.length();
+
           // Write the logFile Type
           out.writeUTF(logFile.getName());
 
           // Write the log length as UTF so that it is printable
-          out.writeUTF(String.valueOf(logFile.length()));
+          out.writeUTF(String.valueOf(fileLength));
 
           // Write the log itself
           FileInputStream in = null;
@@ -181,8 +183,23 @@ public class AggregatedLogFormat {
             in = new FileInputStream(logFile);
             byte[] buf = new byte[65535];
             int len = 0;
+            long bytesLeft = fileLength;
             while ((len = in.read(buf)) != -1) {
-              out.write(buf, 0, len);
+              //If buffer contents within fileLength, write
+              if (len <= bytesLeft) {
+                out.write(buf, 0, len);
+                bytesLeft -= len;
+              }
+              //else only write contents that are within fileLength, then exit early
+              else {
+                out.write(buf, 0, (int)(bytesLeft));
+                break;
+              }
+            }
+            long newLength = logFile.length();
+            if(fileLength < newLength) {
+              LOG.warn("Aggregated logs truncated by approximately " +
+                  (newLength-fileLength) + " bytes.");
             }
           } finally {
             if (in != null) {
@@ -523,7 +540,7 @@ public class AggregatedLogFormat {
       out.println(fileLengthStr);
       out.println("Log Contents:");
 
-      int curRead = 0;
+      long curRead = 0;
       long pendingRead = fileLength - curRead;
       int toRead =
                 pendingRead > buf.length ? buf.length : (int) pendingRead;

+ 91 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java

@@ -26,6 +26,7 @@ import java.io.OutputStreamWriter;
 import java.io.StringWriter;
 import java.io.Writer;
 import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
 
 import junit.framework.Assert;
 
@@ -73,6 +74,96 @@ public class TestAggregatedLogFormat {
     fs.delete(workDirPath, true);
   }
 
+  //Test for Corrupted AggregatedLogs. The Logs should not write more data
+  //if Logvalue.write() is called and the application is still
+  //appending to logs
+
+  @Test
+  public void testForCorruptedAggregatedLogs() throws Exception {
+    Configuration conf = new Configuration();
+    File workDir = new File(testWorkDir, "testReadAcontainerLogs1");
+    Path remoteAppLogFile =
+        new Path(workDir.getAbsolutePath(), "aggregatedLogFile");
+    Path srcFileRoot = new Path(workDir.getAbsolutePath(), "srcFiles");
+    ContainerId testContainerId = BuilderUtils.newContainerId(1, 1, 1, 1);
+    Path t =
+        new Path(srcFileRoot, testContainerId.getApplicationAttemptId()
+            .getApplicationId().toString());
+    Path srcFilePath = new Path(t, testContainerId.toString());
+
+    long numChars = 950000;
+
+    writeSrcFileAndALog(srcFilePath, "stdout", numChars, remoteAppLogFile,
+       srcFileRoot, testContainerId);
+
+    LogReader logReader = new LogReader(conf, remoteAppLogFile);
+    LogKey rLogKey = new LogKey();
+    DataInputStream dis = logReader.next(rLogKey);
+    Writer writer = new StringWriter();
+    try {
+      LogReader.readAcontainerLogs(dis, writer);
+    } catch (Exception e) {
+      if(e.toString().contains("NumberFormatException")) {
+        Assert.fail("Aggregated logs are corrupted.");
+      }
+    }
+  }
+
+  private void writeSrcFileAndALog(Path srcFilePath, String fileName, final long length,
+      Path remoteAppLogFile, Path srcFileRoot, ContainerId testContainerId)
+      throws Exception {
+    File dir = new File(srcFilePath.toString());
+    if (!dir.exists()) {
+      if (!dir.mkdirs()) {
+        throw new IOException("Unable to create directory : " + dir);
+      }
+    }
+
+    File outputFile = new File(new File(srcFilePath.toString()), fileName);
+    FileOutputStream os = new FileOutputStream(outputFile);
+    final OutputStreamWriter osw = new OutputStreamWriter(os, "UTF8");
+    final int ch = filler;
+
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    LogWriter logWriter = new LogWriter(conf, remoteAppLogFile, ugi);
+
+    LogKey logKey = new LogKey(testContainerId);
+    LogValue logValue =
+        new LogValue(Collections.singletonList(srcFileRoot.toString()),
+            testContainerId);
+
+    final CountDownLatch latch = new CountDownLatch(1);
+
+    Thread t = new Thread() {
+      public void run() {
+        try {
+          for(int i=0; i < length/3; i++) {
+              osw.write(ch);
+          }
+
+          latch.countDown();
+
+          for(int i=0; i < (2*length)/3; i++) {
+            osw.write(ch);
+          }
+          osw.close();
+        } catch (IOException e) {
+          // TODO Auto-generated catch block
+          e.printStackTrace();
+        }
+      }
+    };
+    t.start();
+
+    //Wait till the osw is partially written
+    //aggregation starts once the ows has completed 1/3rd of its work
+    latch.await();
+
+    //Aggregate The Logs
+    logWriter.append(logKey, logValue);
+    logWriter.close();
+  }
+
   //Verify the output generated by readAContainerLogs(DataInputStream, Writer)
   @Test
   public void testReadAcontainerLogs1() throws Exception {