ソースを参照

YARN-7259. Add size-based rolling policy to LogAggregationIndexedFileController. (xgong via wangda)

Change-Id: Ifaf82c0aee6b73b9b6ebf103aa72e131e3942f31
Wangda Tan 7 年 前
コミット
280080fad0

+ 6 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/IndexedFileAggregatedLogsBlock.java

@@ -101,10 +101,9 @@ public class IndexedFileAggregatedLogsBlock extends LogAggregationHtmlBlock {
       return;
     }
 
-    Map<String, FileStatus> checkSumFiles;
+    Map<String, Long> checkSumFiles;
     try {
-      checkSumFiles = fileController.filterFiles(nodeFiles,
-          LogAggregationIndexedFileController.CHECK_SUM_FILE_SUFFIX);
+      checkSumFiles = fileController.parseCheckSumFiles(nodeFiles);
     } catch (IOException ex) {
       LOG.error("Error getting logs for " + logEntity, ex);
       html.h1("Error getting logs for " + logEntity);
@@ -125,12 +124,11 @@ public class IndexedFileAggregatedLogsBlock extends LogAggregationHtmlBlock {
     String desiredLogType = $(CONTAINER_LOG_TYPE);
     try {
       for (FileStatus thisNodeFile : fileToRead) {
-        FileStatus checkSum = fileController.getAllChecksumFiles(
-            checkSumFiles, thisNodeFile.getPath().getName());
+        Long checkSumIndex = checkSumFiles.get(
+            thisNodeFile.getPath().getName());
         long endIndex = -1;
-        if (checkSum != null) {
-          endIndex = fileController.loadIndexedLogsCheckSum(
-             checkSum.getPath());
+        if (checkSumIndex != null) {
+          endIndex = checkSumIndex.longValue();
         }
         IndexedLogsMeta indexedLogsMeta = null;
         try {

+ 273 - 124
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java

@@ -29,6 +29,8 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.nio.charset.Charset;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -41,7 +43,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.UUID;
 import org.apache.commons.lang.SerializationUtils;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -54,6 +55,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.HarFs;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.SecureIOUtils;
@@ -77,6 +79,8 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
 import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
 import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.util.Times;
 import org.apache.hadoop.yarn.webapp.View.ViewContext;
 import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block;
@@ -102,7 +106,8 @@ public class LogAggregationIndexedFileController
       "indexedFile.fs.op.num-retries";
   private static final String FS_RETRY_INTERVAL_MS_ATTR =
       "indexedFile.fs.retry-interval-ms";
-  private static final int UUID_LENGTH = 36;
+  private static final String LOG_ROLL_OVER_MAX_FILE_SIZE_GB =
+      "indexedFile.log.roll-over.max-file-size-gb";
 
   @VisibleForTesting
   public static final String CHECK_SUM_FILE_SUFFIX = "-checksum";
@@ -121,7 +126,10 @@ public class LogAggregationIndexedFileController
   private Path remoteLogCheckSumFile;
   private FileContext fc;
   private UserGroupInformation ugi;
-  private String uuid = null;
+  private byte[] uuid = null;
+  private final int UUID_LENGTH = 32;
+  private long logRollOverMaxFileSize;
+  private Clock sysClock;
 
   public LogAggregationIndexedFileController() {}
 
@@ -164,6 +172,8 @@ public class LogAggregationIndexedFileController
         compressName);
     this.fsNumRetries = conf.getInt(FS_NUM_RETRIES_ATTR, 3);
     this.fsRetryInterval = conf.getLong(FS_RETRY_INTERVAL_MS_ATTR, 1000L);
+    this.logRollOverMaxFileSize = getRollOverLogMaxSize(conf);
+    this.sysClock = getSystemClock();
   }
 
   @Override
@@ -173,11 +183,12 @@ public class LogAggregationIndexedFileController
     final UserGroupInformation userUgi = context.getUserUgi();
     final Map<ApplicationAccessType, String> appAcls = context.getAppAcls();
     final String nodeId = context.getNodeId().toString();
+    final ApplicationId appId = context.getAppId();
     final Path remoteLogFile = context.getRemoteNodeLogFileForApp();
     this.ugi = userUgi;
     logAggregationSuccessfullyInThisCyCle = false;
     logsMetaInThisCycle = new IndexedPerAggregationLogMeta();
-    logAggregationTimeInThisCycle = System.currentTimeMillis();
+    logAggregationTimeInThisCycle = this.sysClock.getTime();
     logsMetaInThisCycle.setUploadTimeStamp(logAggregationTimeInThisCycle);
     logsMetaInThisCycle.setRemoteNodeFile(remoteLogFile.getName());
     try {
@@ -187,57 +198,6 @@ public class LogAggregationIndexedFileController
           fc = FileContext.getFileContext(
               remoteRootLogDir.toUri(), conf);
           fc.setUMask(APP_LOG_FILE_UMASK);
-          boolean fileExist = fc.util().exists(remoteLogFile);
-          if (fileExist && context.isLogAggregationInRolling()) {
-            fsDataOStream = fc.create(remoteLogFile,
-                EnumSet.of(CreateFlag.APPEND),
-                new Options.CreateOpts[] {});
-            if (uuid == null) {
-              FSDataInputStream fsDataInputStream = null;
-              try {
-                fsDataInputStream = fc.open(remoteLogFile);
-                byte[] b = new byte[UUID_LENGTH];
-                int actual = fsDataInputStream.read(b);
-                if (actual != UUID_LENGTH) {
-                  // Get an error when parse the UUID from existed log file.
-                  // Simply OverWrite the existed log file and re-create the
-                  // UUID.
-                  fsDataOStream = fc.create(remoteLogFile,
-                      EnumSet.of(CreateFlag.OVERWRITE),
-                          new Options.CreateOpts[] {});
-                  uuid = UUID.randomUUID().toString();
-                  fsDataOStream.write(uuid.getBytes(Charset.forName("UTF-8")));
-                  fsDataOStream.flush();
-                } else {
-                  uuid = new String(b, Charset.forName("UTF-8"));
-                }
-              } finally {
-                IOUtils.cleanupWithLogger(LOG, fsDataInputStream);
-              }
-            }
-            // if the remote log file exists, but we do not have any
-            // indexedLogsMeta. We need to re-load indexedLogsMeta from
-            // the existing remote log file. If the re-load fails, we simply
-            // re-create a new indexedLogsMeta object. And will re-load
-            // the indexedLogsMeta from checksum file later.
-            if (indexedLogsMeta == null) {
-              try {
-                indexedLogsMeta = loadIndexedLogsMeta(remoteLogFile);
-              } catch (IOException ex) {
-                // DO NOTHING
-              }
-            }
-          } else {
-            fsDataOStream = fc.create(remoteLogFile,
-                EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
-                new Options.CreateOpts[] {});
-            if (uuid == null) {
-              uuid = UUID.randomUUID().toString();
-            }
-            byte[] b = uuid.getBytes(Charset.forName("UTF-8"));
-            fsDataOStream.write(b);
-            fsDataOStream.flush();
-          }
           if (indexedLogsMeta == null) {
             indexedLogsMeta = new IndexedLogsMeta();
             indexedLogsMeta.setVersion(VERSION);
@@ -249,44 +209,24 @@ public class LogAggregationIndexedFileController
                 YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE);
             indexedLogsMeta.setCompressName(compressName);
           }
-          final long currentAggregatedLogFileLength = fc
-              .getFileStatus(remoteLogFile).getLen();
-          // only check the check-sum file when we are in append mode
+          Path aggregatedLogFile = null;
           if (context.isLogAggregationInRolling()) {
-            // check whether the checksum file exists to figure out
-            // whether the previous log aggregation process is successful
-            // and the aggregated log file is corrupted or not.
-            remoteLogCheckSumFile = new Path(remoteLogFile.getParent(),
-                (remoteLogFile.getName() + CHECK_SUM_FILE_SUFFIX));
-            boolean exist = fc.util().exists(remoteLogCheckSumFile);
-            if (!exist) {
-              FSDataOutputStream checksumFileOutputStream = null;
-              try {
-                checksumFileOutputStream = fc.create(remoteLogCheckSumFile,
-                    EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
-                    new Options.CreateOpts[] {});
-                checksumFileOutputStream.writeLong(
-                    currentAggregatedLogFileLength);
-              } finally {
-                IOUtils.cleanupWithLogger(LOG, checksumFileOutputStream);
-              }
-            } else {
-              FSDataInputStream checksumFileInputStream = null;
-              try {
-                checksumFileInputStream = fc.open(remoteLogCheckSumFile);
-                long endIndex = checksumFileInputStream.readLong();
-                IndexedLogsMeta recoveredLogsMeta = loadIndexedLogsMeta(
-                    remoteLogFile, endIndex);
-                if (recoveredLogsMeta == null) {
-                  indexedLogsMeta.getLogMetas().clear();
-                } else {
-                  indexedLogsMeta = recoveredLogsMeta;
-                }
-              } finally {
-                IOUtils.cleanupWithLogger(LOG, checksumFileInputStream);
-              }
+            aggregatedLogFile = initializeWriterInRolling(
+                remoteLogFile, appId, nodeId);
+          } else {
+            aggregatedLogFile = remoteLogFile;
+            fsDataOStream = fc.create(remoteLogFile,
+                EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
+                new Options.CreateOpts[] {});
+            if (uuid == null) {
+              uuid = createUUID(appId);
             }
+            fsDataOStream.write(uuid);
+            fsDataOStream.flush();
           }
+
+          long aggregatedLogFileLength = fc.getFileStatus(
+              aggregatedLogFile).getLen();
           // append a simple character("\n") to move the writer cursor, so
           // we could get the correct position when we call
           // fsOutputStream.getStartPos()
@@ -294,11 +234,11 @@ public class LogAggregationIndexedFileController
           fsDataOStream.write(dummyBytes);
           fsDataOStream.flush();
 
-          if (fsDataOStream.getPos() >= (currentAggregatedLogFileLength
+          if (fsDataOStream.getPos() >= (aggregatedLogFileLength
               + dummyBytes.length)) {
             currentOffSet = 0;
           } else {
-            currentOffSet = currentAggregatedLogFileLength;
+            currentOffSet = aggregatedLogFileLength;
           }
           return null;
         }
@@ -308,6 +248,104 @@ public class LogAggregationIndexedFileController
     }
   }
 
+  private Path initializeWriterInRolling(final Path remoteLogFile,
+      final ApplicationId appId, final String nodeId) throws Exception {
+    Path aggregatedLogFile = null;
+    // check uuid
+    // if we can not find uuid, we would load the uuid
+    // from previous aggregated log files, and at the same
+    // time, we would delete any aggregated log files which
+    // has invalid uuid.
+    if (uuid == null) {
+      uuid = loadUUIDFromLogFile(fc, remoteLogFile.getParent(),
+            appId, nodeId);
+    }
+    Path currentRemoteLogFile = getCurrentRemoteLogFile(
+        fc, remoteLogFile.getParent(), nodeId);
+    // check checksum file
+    boolean overwriteCheckSum = true;
+    remoteLogCheckSumFile = new Path(remoteLogFile.getParent(),
+        (remoteLogFile.getName() + CHECK_SUM_FILE_SUFFIX));
+    if(fc.util().exists(remoteLogCheckSumFile)) {
+      // if the checksum file exists, we should reset cached
+      // indexedLogsMeta.
+      indexedLogsMeta.getLogMetas().clear();
+      if (currentRemoteLogFile != null) {
+        FSDataInputStream checksumFileInputStream = null;
+        try {
+          checksumFileInputStream = fc.open(remoteLogCheckSumFile);
+          int nameLength = checksumFileInputStream.readInt();
+          byte[] b = new byte[nameLength];
+          int actualLength = checksumFileInputStream.read(b);
+          if (actualLength == nameLength) {
+            String recoveredLogFile = new String(
+                b, Charset.forName("UTF-8"));
+            if (recoveredLogFile.equals(
+                currentRemoteLogFile.getName())) {
+              overwriteCheckSum = false;
+              long endIndex = checksumFileInputStream.readLong();
+              IndexedLogsMeta recoveredLogsMeta = null;
+              try {
+                truncateFileWithRetries(fc, currentRemoteLogFile,
+                    endIndex);
+                recoveredLogsMeta = loadIndexedLogsMeta(
+                    currentRemoteLogFile);
+              } catch (Exception ex) {
+                recoveredLogsMeta = loadIndexedLogsMeta(
+                    currentRemoteLogFile, endIndex);
+              }
+              if (recoveredLogsMeta != null) {
+                indexedLogsMeta = recoveredLogsMeta;
+              }
+            }
+          }
+        } finally {
+          IOUtils.cleanupWithLogger(LOG, checksumFileInputStream);
+        }
+      }
+    }
+    // check whether we need roll over old logs
+    if (currentRemoteLogFile == null || isRollover(
+        fc, currentRemoteLogFile)) {
+      indexedLogsMeta.getLogMetas().clear();
+      overwriteCheckSum = true;
+      aggregatedLogFile = new Path(remoteLogFile.getParent(),
+          remoteLogFile.getName() + "_" + sysClock.getTime());
+      fsDataOStream = fc.create(aggregatedLogFile,
+          EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
+          new Options.CreateOpts[] {});
+      // writes the uuid
+      fsDataOStream.write(uuid);
+      fsDataOStream.flush();
+    } else {
+      aggregatedLogFile = currentRemoteLogFile;
+      fsDataOStream = fc.create(currentRemoteLogFile,
+          EnumSet.of(CreateFlag.CREATE, CreateFlag.APPEND),
+          new Options.CreateOpts[] {});
+    }
+    // recreate checksum file if needed before aggregate the logs
+    if (overwriteCheckSum) {
+      final long currentAggregatedLogFileLength = fc
+          .getFileStatus(aggregatedLogFile).getLen();
+      FSDataOutputStream checksumFileOutputStream = null;
+      try {
+        checksumFileOutputStream = fc.create(remoteLogCheckSumFile,
+            EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
+            new Options.CreateOpts[] {});
+        String fileName = aggregatedLogFile.getName();
+        checksumFileOutputStream.writeInt(fileName.length());
+        checksumFileOutputStream.write(fileName.getBytes(
+            Charset.forName("UTF-8")));
+        checksumFileOutputStream.writeLong(
+            currentAggregatedLogFileLength);
+        checksumFileOutputStream.flush();
+      } finally {
+        IOUtils.cleanupWithLogger(LOG, checksumFileOutputStream);
+      }
+    }
+    return aggregatedLogFile;
+  }
+
   @Override
   public void closeWriter() {
     IOUtils.cleanupWithLogger(LOG, this.fsDataOStream);
@@ -390,8 +428,7 @@ public class LogAggregationIndexedFileController
     this.fsDataOStream.write(b);
     int length = b.length;
     this.fsDataOStream.writeInt(length);
-    byte[] separator = this.uuid.getBytes(Charset.forName("UTF-8"));
-    this.fsDataOStream.write(separator);
+    this.fsDataOStream.write(uuid);
     if (logAggregationSuccessfullyInThisCyCle &&
         record.isLogAggregationInRolling()) {
       deleteFileWithRetries(fc, ugi, remoteLogCheckSumFile);
@@ -410,6 +447,30 @@ public class LogAggregationIndexedFileController
     }.runWithRetries();
   }
 
+  private void deleteFileWithRetries(final FileContext fileContext,
+      final Path deletePath) throws Exception {
+    new FSAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        if (fileContext.util().exists(deletePath)) {
+          fileContext.delete(deletePath, false);
+        }
+        return null;
+      }
+    }.runWithRetries();
+  }
+
+  private void truncateFileWithRetries(final FileContext fileContext,
+      final Path truncatePath, final long newLength) throws Exception {
+    new FSAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        fileContext.truncate(truncatePath, newLength);
+        return null;
+      }
+    }.runWithRetries();
+  }
+
   private Object deleteFileWithPrivilege(final FileContext fileContext,
       final UserGroupInformation userUgi, final Path fileToDelete)
       throws Exception {
@@ -449,18 +510,16 @@ public class LogAggregationIndexedFileController
       throw new IOException("There is no available log fils for "
           + "application:" + appId);
     }
-    Map<String, FileStatus> checkSumFiles = filterFiles(
-        nodeFiles, CHECK_SUM_FILE_SUFFIX);
+    Map<String, Long> checkSumFiles = parseCheckSumFiles(nodeFiles);
     List<FileStatus> fileToRead = getNodeLogFileToRead(
         nodeFiles, nodeIdStr, appId);
     byte[] buf = new byte[65535];
     for (FileStatus thisNodeFile : fileToRead) {
       String nodeName = thisNodeFile.getPath().getName();
-      FileStatus checkSum = getAllChecksumFiles(checkSumFiles,
-          thisNodeFile.getPath().getName());
+      Long checkSumIndex = checkSumFiles.get(nodeName);
       long endIndex = -1;
-      if (checkSum != null) {
-        endIndex = loadIndexedLogsCheckSum(checkSum.getPath());
+      if (checkSumIndex != null) {
+        endIndex = checkSumIndex.longValue();
       }
       IndexedLogsMeta indexedLogsMeta = null;
       try {
@@ -565,17 +624,16 @@ public class LogAggregationIndexedFileController
       throw new IOException("There is no available log fils for "
           + "application:" + appId);
     }
-    Map<String, FileStatus> checkSumFiles = filterFiles(
-        nodeFiles, CHECK_SUM_FILE_SUFFIX);
+    Map<String, Long> checkSumFiles = parseCheckSumFiles(nodeFiles);
     List<FileStatus> fileToRead = getNodeLogFileToRead(
         nodeFiles, nodeIdStr, appId);
     for(FileStatus thisNodeFile : fileToRead) {
       try {
-        FileStatus checkSum = getAllChecksumFiles(checkSumFiles,
+        Long checkSumIndex = checkSumFiles.get(
             thisNodeFile.getPath().getName());
         long endIndex = -1;
-        if (checkSum != null) {
-          endIndex = loadIndexedLogsCheckSum(checkSum.getPath());
+        if (checkSumIndex != null) {
+          endIndex = checkSumIndex.longValue();
         }
         IndexedLogsMeta current = loadIndexedLogsMeta(
             thisNodeFile.getPath(), endIndex);
@@ -627,21 +685,46 @@ public class LogAggregationIndexedFileController
   }
 
   @Private
-  public Map<String, FileStatus> filterFiles(
-      List<FileStatus> fileList, final String suffix) throws IOException {
-    Map<String, FileStatus> checkSumFiles = new HashMap<>();
+  public Map<String, Long> parseCheckSumFiles(
+      List<FileStatus> fileList) throws IOException {
+    Map<String, Long> checkSumFiles = new HashMap<>();
     Set<FileStatus> status = new HashSet<FileStatus>(fileList);
     Iterable<FileStatus> mask =
         Iterables.filter(status, new Predicate<FileStatus>() {
           @Override
           public boolean apply(FileStatus next) {
             return next.getPath().getName().endsWith(
-                suffix);
+                CHECK_SUM_FILE_SUFFIX);
           }
         });
     status = Sets.newHashSet(mask);
+    FileContext fc = null;
     for (FileStatus file : status) {
-      checkSumFiles.put(file.getPath().getName(), file);
+      FSDataInputStream checksumFileInputStream = null;
+      try {
+        if (fc == null) {
+          fc = FileContext.getFileContext(file.getPath().toUri(), conf);
+        }
+        String nodeName = null;
+        long index = 0L;
+        checksumFileInputStream = fc.open(file.getPath());
+        int nameLength = checksumFileInputStream.readInt();
+        byte[] b = new byte[nameLength];
+        int actualLength = checksumFileInputStream.read(b);
+        if (actualLength == nameLength) {
+          nodeName = new String(b, Charset.forName("UTF-8"));
+          index = checksumFileInputStream.readLong();
+        } else {
+          continue;
+        }
+        if (nodeName != null && !nodeName.isEmpty()) {
+          checkSumFiles.put(nodeName, Long.valueOf(index));
+        }
+      } catch (IOException ex) {
+        continue;
+      } finally {
+        IOUtils.cleanupWithLogger(LOG, checksumFileInputStream);
+      }
     }
     return checkSumFiles;
   }
@@ -755,20 +838,6 @@ public class LogAggregationIndexedFileController
     return loadIndexedLogsMeta(remoteLogPath, -1);
   }
 
-  @Private
-  public long loadIndexedLogsCheckSum(Path remoteLogCheckSumPath)
-      throws IOException {
-    FileContext fileContext =
-        FileContext.getFileContext(remoteLogCheckSumPath.toUri(), conf);
-    FSDataInputStream fsDataIStream = null;
-    try {
-      fsDataIStream = fileContext.open(remoteLogCheckSumPath);
-      return fsDataIStream.readLong();
-    } finally {
-      IOUtils.cleanupWithLogger(LOG, fsDataIStream);
-    }
-  }
-
   /**
    * This IndexedLogsMeta includes all the meta information
    * for the aggregated log file.
@@ -1034,6 +1103,13 @@ public class LogAggregationIndexedFileController
     return conf.getInt(FS_INPUT_BUF_SIZE_ATTR, 256 * 1024);
   }
 
+  @Private
+  @VisibleForTesting
+  public long getRollOverLogMaxSize(Configuration conf) {
+    return 1024L * 1024 * 1024 * conf.getInt(
+        LOG_ROLL_OVER_MAX_FILE_SIZE_GB, 10);
+  }
+
   private abstract class FSAction<T> {
     abstract T run() throws Exception;
 
@@ -1054,4 +1130,77 @@ public class LogAggregationIndexedFileController
       }
     }
   }
+
+  private Path getCurrentRemoteLogFile(final FileContext fc,
+      final Path parent, final String nodeId) throws IOException {
+    RemoteIterator<FileStatus> files = fc.listStatus(parent);
+    long maxTime = 0L;
+    Path returnPath = null;
+    while(files.hasNext()) {
+      FileStatus candidate = files.next();
+      String fileName = candidate.getPath().getName();
+      if (fileName.contains(LogAggregationUtils.getNodeString(nodeId))
+          && !fileName.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX) &&
+          !fileName.endsWith(CHECK_SUM_FILE_SUFFIX)) {
+        if (candidate.getModificationTime() > maxTime) {
+          maxTime = candidate.getModificationTime();
+          returnPath = candidate.getPath();
+        }
+      }
+    }
+    return returnPath;
+  }
+
+  private byte[] loadUUIDFromLogFile(final FileContext fc,
+      final Path parent, final ApplicationId appId, final String nodeId)
+      throws Exception {
+    byte[] id = null;
+    RemoteIterator<FileStatus> files = fc.listStatus(parent);
+    FSDataInputStream fsDataInputStream = null;
+    byte[] uuid = createUUID(appId);
+    while(files.hasNext()) {
+      try {
+        Path checkPath = files.next().getPath();
+        if (checkPath.getName().contains(LogAggregationUtils
+            .getNodeString(nodeId)) && !checkPath.getName()
+                .endsWith(CHECK_SUM_FILE_SUFFIX)) {
+          fsDataInputStream = fc.open(checkPath);
+          byte[] b = new byte[uuid.length];
+          int actual = fsDataInputStream.read(b);
+          if (actual != uuid.length || Arrays.equals(b, uuid)) {
+            deleteFileWithRetries(fc, checkPath);
+          } else if (id == null){
+            id = uuid;
+          }
+        }
+      } finally {
+        IOUtils.cleanupWithLogger(LOG, fsDataInputStream);
+      }
+    }
+    return id == null ? uuid : id;
+  }
+
+  @Private
+  @VisibleForTesting
+  public boolean isRollover(final FileContext fc,
+      final Path candidate) throws IOException {
+    FileStatus fs = fc.getFileStatus(candidate);
+    return fs.getLen() >= this.logRollOverMaxFileSize;
+  }
+
+  @Private
+  @VisibleForTesting
+  public Clock getSystemClock() {
+    return SystemClock.getInstance();
+  }
+
+  private byte[] createUUID(ApplicationId appId) throws IOException {
+    try {
+      MessageDigest digest = MessageDigest.getInstance("SHA-256");
+      return digest.digest(appId.toString().getBytes(
+          Charset.forName("UTF-8")));
+    } catch (NoSuchAlgorithmException ex) {
+      throw new IOException(ex);
+    }
+  }
 }

+ 61 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java

@@ -21,13 +21,13 @@ package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
-
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.io.Writer;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -37,6 +37,8 @@ import java.util.Set;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -54,6 +56,8 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
 import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo;
 import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.ControlledClock;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -143,8 +147,27 @@ public class TestLogAggregationIndexFileController {
     LogValue value = mock(LogValue.class);
     when(value.getPendingLogFilesToUploadForThisContainer()).thenReturn(files);
 
+    final ControlledClock clock = new ControlledClock();
+    clock.setTime(System.currentTimeMillis());
     LogAggregationIndexedFileController fileFormat
-        = new LogAggregationIndexedFileController();
+        = new LogAggregationIndexedFileController() {
+          private int rollOverCheck = 0;
+          @Override
+          public Clock getSystemClock() {
+            return clock;
+          }
+
+          @Override
+          public boolean isRollover(final FileContext fc,
+              final Path candidate) throws IOException {
+            rollOverCheck++;
+            if (rollOverCheck >= 3) {
+              return true;
+            }
+            return false;
+          }
+        };
+
     fileFormat.initialize(conf, "Indexed");
 
     Map<ApplicationAccessType, String> appAcls = new HashMap<>();
@@ -203,7 +226,11 @@ public class TestLogAggregationIndexFileController {
         + LogAggregationIndexedFileController.CHECK_SUM_FILE_SUFFIX);
     FSDataOutputStream fInput = null;
     try {
+      String nodeName = logPath.getName() + "_" + clock.getTime();
       fInput = FileSystem.create(fs, checksumFile, LOG_FILE_UMASK);
+      fInput.writeInt(nodeName.length());
+      fInput.write(nodeName.getBytes(
+          Charset.forName("UTF-8")));
       fInput.writeLong(0);
     } finally {
       IOUtils.closeQuietly(fInput);
@@ -236,9 +263,9 @@ public class TestLogAggregationIndexFileController {
 
     // We did not call postWriter which we would keep the checksum file.
     // We can only get the logs/logmeta from the first write.
-    fileFormat.readAggregatedLogsMeta(
+    meta = fileFormat.readAggregatedLogsMeta(
         logRequest);
-    Assert.assertEquals(meta.size(), meta.size(), 1);
+    Assert.assertEquals(meta.size(), 1);
     for (ContainerLogMeta log : meta) {
       Assert.assertTrue(log.getContainerId().equals(containerId.toString()));
       Assert.assertTrue(log.getNodeId().equals(nodeId.toString()));
@@ -267,9 +294,37 @@ public class TestLogAggregationIndexFileController {
     fileFormat.write(key1, value2);
     fileFormat.postWrite(context);
     fileFormat.closeWriter();
-    fileFormat.readAggregatedLogsMeta(
+    meta = fileFormat.readAggregatedLogsMeta(
             logRequest);
-    Assert.assertEquals(meta.size(), meta.size(), 2);
+    Assert.assertEquals(meta.size(), 2);
+    for (ContainerLogMeta log : meta) {
+      Assert.assertTrue(log.getContainerId().equals(containerId.toString()));
+      Assert.assertTrue(log.getNodeId().equals(nodeId.toString()));
+      for (ContainerLogFileInfo file : log.getContainerLogMeta()) {
+        fileNames.add(file.getFileName());
+      }
+    }
+    fileNames.removeAll(newLogTypes);
+    Assert.assertTrue(fileNames.isEmpty());
+    foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out);
+    Assert.assertTrue(foundLogs);
+    for (String logType : newLogTypes) {
+      Assert.assertTrue(sysOutStream.toString().contains(logMessage(
+          containerId, logType)));
+    }
+    sysOutStream.reset();
+
+    // start to roll over old logs
+    clock.setTime(System.currentTimeMillis());
+    fileFormat.initializeWriter(context);
+    fileFormat.write(key1, value2);
+    fileFormat.postWrite(context);
+    fileFormat.closeWriter();
+    FileStatus[] status = fs.listStatus(logPath.getParent());
+    Assert.assertTrue(status.length == 2);
+    meta = fileFormat.readAggregatedLogsMeta(
+        logRequest);
+    Assert.assertEquals(meta.size(), 3);
     for (ContainerLogMeta log : meta) {
       Assert.assertTrue(log.getContainerId().equals(containerId.toString()));
       Assert.assertTrue(log.getNodeId().equals(nodeId.toString()));