Bläddra i källkod

YARN-1670. Fixed a bug in log-aggregation that can cause the writer to write more log-data than the log-length that it records. Contributed by Mit Desai.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1580005 13f79535-47bb-0310-9956-ffa450edef68
Vinod Kumar Vavilapalli 11 år sedan
förälder
incheckning
2826a35469

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -536,6 +536,9 @@ Release 2.4.0 - UNRELEASED
     YARN-1811. Fixed AMFilters in YARN to correctly accept requests from either
     web-app proxy or the RMs when HA is enabled. (Robert Kanter via vinodkv)
 
+    YARN-1670. Fixed a bug in log-aggregation that can cause the writer to write
+    more log-data than the log-length that it records. (Mit Desai via vinodk)
+
 Release 2.3.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 12 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java

@@ -182,20 +182,29 @@ public class AggregatedLogFormat {
         Arrays.sort(logFiles);
         for (File logFile : logFiles) {
 
+          long fileLength = 0;
+
           // Write the logFile Type
           out.writeUTF(logFile.getName());
 
           // Write the log length as UTF so that it is printable
-          out.writeUTF(String.valueOf(logFile.length()));
+          out.writeUTF(String.valueOf(fileLength = logFile.length()));
 
           // Write the log itself
           FileInputStream in = null;
           try {
             in = SecureIOUtils.openForRead(logFile, getUser(), null);
             byte[] buf = new byte[65535];
+            long curRead = 0;
             int len = 0;
-            while ((len = in.read(buf)) != -1) {
+            while ( ((len = in.read(buf)) != -1) && (curRead < fileLength) ) {
               out.write(buf, 0, len);
+              curRead += len;
+            }
+            long newLength = logFile.length();
+            if(fileLength < newLength) {
+              LOG.warn("Aggregated Logs Truncated by "+
+                  (newLength-fileLength) +" bytes.");
             }
           } catch (IOException e) {
             String message = "Error aggregating log file. Log file : "
@@ -553,7 +562,7 @@ public class AggregatedLogFormat {
       out.println(fileLengthStr);
       out.println("Log Contents:");
 
-      int curRead = 0;
+      long curRead = 0;
       long pendingRead = fileLength - curRead;
       int toRead =
                 pendingRead > buf.length ? buf.length : (int) pendingRead;

+ 91 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java

@@ -34,6 +34,7 @@ import java.io.UnsupportedEncodingException;
 import java.io.Writer;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
 
 import org.junit.Assert;
 
@@ -87,6 +88,96 @@ public class TestAggregatedLogFormat {
     fs.delete(workDirPath, true);
   }
 
+  //Test for Corrupted AggregatedLogs. The Logs should not write more data
+  //if Logvalue.write() is called and the application is still
+  //appending to logs
+
+  @Test
+  public void testForCorruptedAggregatedLogs() throws Exception {
+    Configuration conf = new Configuration();
+    File workDir = new File(testWorkDir, "testReadAcontainerLogs1");
+    Path remoteAppLogFile =
+        new Path(workDir.getAbsolutePath(), "aggregatedLogFile");
+    Path srcFileRoot = new Path(workDir.getAbsolutePath(), "srcFiles");
+    ContainerId testContainerId = TestContainerId.newContainerId(1, 1, 1, 1);
+    Path t =
+        new Path(srcFileRoot, testContainerId.getApplicationAttemptId()
+            .getApplicationId().toString());
+    Path srcFilePath = new Path(t, testContainerId.toString());
+
+    long numChars = 950000;
+
+    writeSrcFileAndALog(srcFilePath, "stdout", numChars, remoteAppLogFile,
+       srcFileRoot, testContainerId);
+
+    LogReader logReader = new LogReader(conf, remoteAppLogFile);
+    LogKey rLogKey = new LogKey();
+    DataInputStream dis = logReader.next(rLogKey);
+    Writer writer = new StringWriter();
+    try {
+      LogReader.readAcontainerLogs(dis, writer);
+    } catch (Exception e) {
+      if(e.toString().contains("NumberFormatException")) {
+        Assert.fail("Aggregated logs are corrupted.");
+      }
+    }
+  }
+
+  private void writeSrcFileAndALog(Path srcFilePath, String fileName, final long length,
+      Path remoteAppLogFile, Path srcFileRoot, ContainerId testContainerId)
+      throws Exception {
+    File dir = new File(srcFilePath.toString());
+    if (!dir.exists()) {
+      if (!dir.mkdirs()) {
+        throw new IOException("Unable to create directory : " + dir);
+      }
+    }
+
+    File outputFile = new File(new File(srcFilePath.toString()), fileName);
+    FileOutputStream os = new FileOutputStream(outputFile);
+    final OutputStreamWriter osw = new OutputStreamWriter(os, "UTF8");
+    final int ch = filler;
+
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    LogWriter logWriter = new LogWriter(conf, remoteAppLogFile, ugi);
+
+    LogKey logKey = new LogKey(testContainerId);
+    LogValue logValue =
+        spy(new LogValue(Collections.singletonList(srcFileRoot.toString()),
+            testContainerId, ugi.getShortUserName()));
+
+    final CountDownLatch latch = new CountDownLatch(1);
+
+    Thread t = new Thread() {
+      public void run() {
+        try {
+          for(int i=0; i < length/3; i++) {
+              osw.write(ch);
+          }
+
+          latch.countDown();
+
+          for(int i=0; i < (2*length)/3; i++) {
+            osw.write(ch);
+          }
+          osw.close();
+        } catch (IOException e) {
+          // TODO Auto-generated catch block
+          e.printStackTrace();
+        }
+      }
+    };
+    t.start();
+
+    //Wait till the osw is partially written
+    //aggregation starts once the ows has completed 1/3rd of its work
+    latch.await();
+
+    //Aggregate The Logs
+    logWriter.append(logKey, logValue);
+    logWriter.close();
+  }
+
   //Verify the output generated by readAContainerLogs(DataInputStream, Writer)
   @Test
   public void testReadAcontainerLogs1() throws Exception {