瀏覽代碼

HDFS-9782. RollingFileSystemSink should have configurable roll interval. (Daniel Templeton via kasha)

(cherry picked from commit 57c31a3fef625f1ec609d7e8873d4941f7ed5cbc)
Karthik Kambatla 9 年之前
父節點
當前提交
afe603d973

+ 291 - 62
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSink.java

@@ -31,6 +31,10 @@ import java.util.Date;
 import java.util.TimeZone;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.commons.configuration.SubsetConfiguration;
 import org.apache.commons.lang.time.FastDateFormat;
@@ -53,14 +57,14 @@ import org.apache.hadoop.security.UserGroupInformation;
 /**
  * <p>This class is a metrics sink that uses
  * {@link org.apache.hadoop.fs.FileSystem} to write the metrics logs.  Every
- * hour a new directory will be created under the path specified by the
+ * roll interval a new directory will be created under the path specified by the
  * <code>basepath</code> property. All metrics will be logged to a file in the
- * current hour's directory in a file named &lt;hostname&gt;.log, where
+ * current interval's directory in a file named &lt;hostname&gt;.log, where
  * &lt;hostname&gt; is the name of the host on which the metrics logging
  * process is running. The base path is set by the
  * <code>&lt;prefix&gt;.sink.&lt;instance&gt;.basepath</code> property.  The
- * time zone used to create the current hour's directory name is GMT.  If the
- * <code>basepath</code> property isn't specified, it will default to
+ * time zone used to create the current interval's directory name is GMT.  If
+ * the <code>basepath</code> property isn't specified, it will default to
  * &quot;/tmp&quot;, which is the temp directory on whatever default file
  * system is configured for the cluster.</p>
  *
@@ -69,6 +73,26 @@ import org.apache.hadoop.security.UserGroupInformation;
  * writing a log file.  The default value is <code>true</code>.  When set to
  * <code>false</code>, file errors are quietly swallowed.</p>
  *
+ * <p>The <code>roll-interval</code> property sets the amount of time before
+ * rolling the directory. The default value is 1 hour. The roll interval may
+ * not be less than 1 minute. The property's value should be given as
+ * <i>number unit</i>, where <i>number</i> is an integer value, and
+ * <i>unit</i> is a valid unit.  Valid units are <i>minute</i>, <i>hour</i>,
+ * and <i>day</i>.  The units are case insensitive and may be abbreviated or
+ * plural. If no units are specified, hours are assumed. For example,
+ * &quot;2&quot;, &quot;2h&quot;, &quot;2 hour&quot;, and
+ * &quot;2 hours&quot; are all valid ways to specify two hours.</p>
+ *
+ * <p>The <code>roll-offset-interval-millis</code> property sets the upper
+ * bound on a random time interval (in milliseconds) that is used to delay
+ * before the initial roll.  All subsequent rolls will happen an integer
+ * number of roll intervals after the initial roll, hence retaining the original
+ * offset. The purpose of this property is to insert some variance in the roll
+ * times so that large clusters using this sink on every node don't cause a
+ * performance impact on HDFS by rolling simultaneously.  The default value is
+ * 30000 (30s).  When writing to HDFS, as a rule of thumb, the roll offset in
+ * millis should be no less than the number of sink instances times 5.
+ *
  * <p>The primary use of this class is for logging to HDFS.  As it uses
  * {@link org.apache.hadoop.fs.FileSystem} to access the target file system,
  * however, it can be used to write to the local file system, Amazon S3, or any
@@ -79,7 +103,8 @@ import org.apache.hadoop.security.UserGroupInformation;
  * <p>Not all file systems support the ability to append to files.  In file
  * systems without the ability to append to files, only one writer can write to
  * a file at a time.  To allow for concurrent writes from multiple daemons on a
- * single host, the <code>source</code> property should be set to the name of
+ * single host, the <code>source</code> property is used to set unique headers
+ * for the log files.  The property should be set to the name of
  * the source daemon, e.g. <i>namenode</i>.  The value of the
  * <code>source</code> property should typically be the same as the property's
  * prefix.  If this property is not set, the source is taken to be
@@ -105,7 +130,7 @@ import org.apache.hadoop.security.UserGroupInformation;
  * 3.</p>
  *
  * <p>Note also that when writing to HDFS, the file size information is not
- * updated until the file is closed (e.g. at the top of the hour) even though
+ * updated until the file is closed (at the end of the interval) even though
  * the data is being written successfully. This is a known HDFS limitation that
  * exists because of the performance cost of updating the metadata.  See
  * <a href="https://issues.apache.org/jira/browse/HDFS-5478">HDFS-5478</a>.</p>
@@ -124,21 +149,32 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
   private static final String BASEPATH_KEY = "basepath";
   private static final String SOURCE_KEY = "source";
   private static final String IGNORE_ERROR_KEY = "ignore-error";
+  private static final boolean DEFAULT_IGNORE_ERROR = false;
   private static final String ALLOW_APPEND_KEY = "allow-append";
+  private static final boolean DEFAULT_ALLOW_APPEND = false;
   private static final String KEYTAB_PROPERTY_KEY = "keytab-key";
   private static final String USERNAME_PROPERTY_KEY = "principal-key";
+  private static final String ROLL_INTERVAL_KEY = "roll-interval";
+  private static final String DEFAULT_ROLL_INTERVAL = "1h";
+  private static final String ROLL_OFFSET_INTERVAL_MILLIS_KEY =
+      "roll-offset-interval-millis";
+  private static final int DEFAULT_ROLL_OFFSET_INTERVAL_MILLIS = 30000;
   private static final String SOURCE_DEFAULT = "unknown";
   private static final String BASEPATH_DEFAULT = "/tmp";
   private static final FastDateFormat DATE_FORMAT =
-      FastDateFormat.getInstance("yyyyMMddHH", TimeZone.getTimeZone("GMT"));
+      FastDateFormat.getInstance("yyyyMMddHHmm", TimeZone.getTimeZone("GMT"));
   private final Object lock = new Object();
   private boolean initialized = false;
   private SubsetConfiguration properties;
   private Configuration conf;
-  private String source;
-  private boolean ignoreError;
-  private boolean allowAppend;
-  private Path basePath;
+  @VisibleForTesting
+  protected String source;
+  @VisibleForTesting
+  protected boolean ignoreError;
+  @VisibleForTesting
+  protected boolean allowAppend;
+  @VisibleForTesting
+  protected Path basePath;
   private FileSystem fileSystem;
   // The current directory path into which we're writing files
   private Path currentDirPath;
@@ -149,11 +185,21 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
   // We keep this only to be able to call hsynch() on it.
   private FSDataOutputStream currentFSOutStream;
   private Timer flushTimer;
-
-  // This flag is used during testing to make the flusher thread run after only
-  // a short pause instead of waiting for the top of the hour.
+  // The amount of time between rolls
+  @VisibleForTesting
+  protected long rollIntervalMillis;
+  // The maximum amount of random time to add to the initial roll
+  @VisibleForTesting
+  protected long rollOffsetIntervalMillis;
+  // The time for the nextFlush
+  @VisibleForTesting
+  protected Calendar nextFlush = null;
+  // This flag when true causes a metrics write to schedule a flush thread to
+  // run immediately, but only if a flush thread is already scheduled. (It's a
+  // timing thing.  If the first write forces the flush, it will strand the
+  // second write.)
   @VisibleForTesting
-  protected static boolean flushQuickly = false;
+  protected static boolean forceFlush = false;
   // This flag is used by the flusher thread to indicate that it has run. Used
   // only for testing purposes.
   @VisibleForTesting
@@ -165,13 +211,36 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
   @VisibleForTesting
   protected static FileSystem suppliedFilesystem = null;
 
+  /**
+   * Create an empty instance.  Required for reflection.
+   */
+  public RollingFileSystemSink() {
+  }
+
+  /**
+   * Create an instance for testing.
+   *
+   * @param flushIntervalMillis the roll interval in millis
+   * @param flushOffsetIntervalMillis the roll offset interval in millis
+   */
+  @VisibleForTesting
+  protected RollingFileSystemSink(long flushIntervalMillis,
+      long flushOffsetIntervalMillis) {
+    this.rollIntervalMillis = flushIntervalMillis;
+    this.rollOffsetIntervalMillis = flushOffsetIntervalMillis;
+  }
+
   @Override
   public void init(SubsetConfiguration metrics2Properties) {
     properties = metrics2Properties;
     basePath = new Path(properties.getString(BASEPATH_KEY, BASEPATH_DEFAULT));
     source = properties.getString(SOURCE_KEY, SOURCE_DEFAULT);
-    ignoreError = properties.getBoolean(IGNORE_ERROR_KEY, false);
-    allowAppend = properties.getBoolean(ALLOW_APPEND_KEY, false);
+    ignoreError = properties.getBoolean(IGNORE_ERROR_KEY, DEFAULT_IGNORE_ERROR);
+    allowAppend = properties.getBoolean(ALLOW_APPEND_KEY, DEFAULT_ALLOW_APPEND);
+    rollOffsetIntervalMillis =
+        getNonNegative(ROLL_OFFSET_INTERVAL_MILLIS_KEY,
+          DEFAULT_ROLL_OFFSET_INTERVAL_MILLIS);
+    rollIntervalMillis = getRollInterval();
 
     conf = loadConf();
     UserGroupInformation.setConfiguration(conf);
@@ -179,8 +248,8 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
     // Don't do secure setup if it's not needed.
     if (UserGroupInformation.isSecurityEnabled()) {
       // Validate config so that we don't get an NPE
-      checkForProperty(properties, KEYTAB_PROPERTY_KEY);
-      checkForProperty(properties, USERNAME_PROPERTY_KEY);
+      checkIfPropertyExists(KEYTAB_PROPERTY_KEY);
+      checkIfPropertyExists(USERNAME_PROPERTY_KEY);
 
 
       try {
@@ -228,6 +297,7 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
       }
 
       flushTimer = new Timer("RollingFileSystemSink Flusher", true);
+      setInitialFlushTime(new Date());
     }
 
     return success;
@@ -238,8 +308,6 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
    * strings, allowing for either the property or the configuration not to be
    * set.
    *
-   * @param properties the sink properties
-   * @param conf the conf
    * @param property the property to stringify
    * @return the stringified property
    */
@@ -264,15 +332,98 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
     return securityProperty;
   }
 
+  /**
+   * Extract the roll interval from the configuration and return it in
+   * milliseconds.
+   *
+   * @return the roll interval in millis
+   */
+  @VisibleForTesting
+  protected long getRollInterval() {
+    String rollInterval =
+        properties.getString(ROLL_INTERVAL_KEY, DEFAULT_ROLL_INTERVAL);
+    Pattern pattern = Pattern.compile("^\\s*(\\d+)\\s*([A-Za-z]*)\\s*$");
+    Matcher match = pattern.matcher(rollInterval);
+    long millis;
+
+    if (match.matches()) {
+      String flushUnit = match.group(2);
+      int rollIntervalInt;
+
+      try {
+        rollIntervalInt = Integer.parseInt(match.group(1));
+      } catch (NumberFormatException ex) {
+        throw new MetricsException("Unrecognized flush interval: "
+            + rollInterval + ". Must be a number followed by an optional "
+            + "unit. The unit must be one of: minute, hour, day", ex);
+      }
+
+      if ("".equals(flushUnit)) {
+        millis = TimeUnit.HOURS.toMillis(rollIntervalInt);
+      } else {
+        switch (flushUnit.toLowerCase()) {
+        case "m":
+        case "min":
+        case "minute":
+        case "minutes":
+          millis = TimeUnit.MINUTES.toMillis(rollIntervalInt);
+          break;
+        case "h":
+        case "hr":
+        case "hour":
+        case "hours":
+          millis = TimeUnit.HOURS.toMillis(rollIntervalInt);
+          break;
+        case "d":
+        case "day":
+        case "days":
+          millis = TimeUnit.DAYS.toMillis(rollIntervalInt);
+          break;
+        default:
+          throw new MetricsException("Unrecognized unit for flush interval: "
+              + flushUnit + ". Must be one of: minute, hour, day");
+        }
+      }
+    } else {
+      throw new MetricsException("Unrecognized flush interval: "
+          + rollInterval + ". Must be a number followed by an optional unit."
+          + " The unit must be one of: minute, hour, day");
+    }
+
+    if (millis < 60000) {
+      throw new MetricsException("The flush interval property must be "
+          + "at least 1 minute. Value was " + rollInterval);
+    }
+
+    return millis;
+  }
+
+  /**
+   * Return the property value if it's non-negative and throw an exception if
+   * it's not.
+   *
+   * @param key the property key
+   * @param defaultValue the default value
+   */
+  private long getNonNegative(String key, int defaultValue) {
+    int flushOffsetIntervalMillis = properties.getInt(key, defaultValue);
+
+    if (flushOffsetIntervalMillis < 0) {
+      throw new MetricsException("The " + key + " property must be "
+          + "non-negative. Value was " + flushOffsetIntervalMillis);
+    }
+
+    return flushOffsetIntervalMillis;
+  }
+
   /**
    * Throw a {@link MetricsException} if the given property is not set.
    *
-   * @param conf the configuration to test
    * @param key the key to validate
    */
-  private static void checkForProperty(SubsetConfiguration conf, String key) {
-    if (!conf.containsKey(key)) {
-      throw new MetricsException("Configuration is missing " + key
+  private void checkIfPropertyExists(String key) {
+    if (!properties.containsKey(key)) {
+      throw new MetricsException("Metrics2 configuration is missing " + key
           + " property");
     }
   }
@@ -301,7 +452,6 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
    * Return the supplied file system for testing or otherwise get a new file
    * system.
    *
-   * @param conf the configuration
    * @return the file system to use
    * @throws MetricsException thrown if the file system could not be retrieved
    */
@@ -327,6 +477,7 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
 
   /**
    * Test whether the file system supports append and return the answer.
+   *
    * @param fs the target file system
    */
   private boolean checkAppend(FileSystem fs) {
@@ -351,14 +502,14 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
    * new directory or new log file
    */
   private void rollLogDirIfNeeded() throws MetricsException {
+    // Because we're working relative to the clock, we use a Date instead
+    // of Time.monotonicNow().
     Date now = new Date();
-    String currentDir = DATE_FORMAT.format(now);
-    Path path = new Path(basePath, currentDir);
 
     // We check whether currentOutStream is null instead of currentDirPath,
     // because if currentDirPath is null, then currentOutStream is null, but
-    // currentOutStream can be null for other reasons.
-    if ((currentOutStream == null) || !path.equals(currentDirPath)) {
+    // currentOutStream can be null for other reasons.  Same for nextFlush.
+    if ((currentOutStream == null) || now.after(nextFlush.getTime())) {
       // If we're not yet connected to HDFS, create the connection
       if (!initialized) {
         initialized = initFs();
@@ -372,7 +523,7 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
           currentOutStream.close();
         }
 
-        currentDirPath = path;
+        currentDirPath = findCurrentDirectory(now);
 
         try {
           rollLogDir();
@@ -380,34 +531,41 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
           throwMetricsException("Failed to create new log file", ex);
         }
 
-        scheduleFlush(now);
+        // Update the time of the next flush
+        updateFlushTime(now);
+        // Schedule the next flush at that time
+        scheduleFlush(nextFlush.getTime());
       }
+    } else if (forceFlush) {
+      scheduleFlush(new Date());
     }
   }
 
   /**
-   * Schedule the current hour's directory to be flushed at the top of the next
-   * hour. If this ends up running after the top of the next hour, it will
-   * execute immediately.
+   * Use the given time to determine the current directory. The current
+   * directory will be based on the {@link #rollIntervalMinutes}.
    *
    * @param now the current time
+   * @return the current directory
    */
-  private void scheduleFlush(Date now) {
-    // Store the current currentDirPath to close later
-    final PrintStream toClose = currentOutStream;
-    Calendar next = Calendar.getInstance();
+  private Path findCurrentDirectory(Date now) {
+    long offset = ((now.getTime() - nextFlush.getTimeInMillis())
+        / rollIntervalMillis) * rollIntervalMillis;
+    String currentDir =
+        DATE_FORMAT.format(new Date(nextFlush.getTimeInMillis() + offset));
 
-    next.setTime(now);
+    return new Path(basePath, currentDir);
+  }
 
-    if (flushQuickly) {
-      // If we're running unit tests, flush after a short pause
-      next.add(Calendar.MILLISECOND, 400);
-    } else {
-      // Otherwise flush at the top of the hour
-      next.set(Calendar.SECOND, 0);
-      next.set(Calendar.MINUTE, 0);
-      next.add(Calendar.HOUR, 1);
-    }
+  /**
+   * Schedule the current interval's directory to be flushed. If this ends up
+   * running after the top of the next interval, it will execute immediately.
+   *
+   * @param when the time the thread should run
+   */
+  private void scheduleFlush(Date when) {
+    // Store the current currentDirPath to close later
+    final PrintStream toClose = currentOutStream;
 
     flushTimer.schedule(new TimerTask() {
       @Override
@@ -420,11 +578,81 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
 
         hasFlushed = true;
       }
-    }, next.getTime());
+    }, when);
+  }
+
+  /**
+   * Update the {@link #nextFlush} variable to the next flush time. Add
+   * an integer number of flush intervals, preserving the initial random offset.
+   *
+   * @param now the current time
+   */
+  @VisibleForTesting
+  protected void updateFlushTime(Date now) {
+    // In non-initial rounds, add an integer number of intervals to the last
+    // flush until a time in the future is achieved, thus preserving the
+    // original random offset.
+    int millis =
+        (int) (((now.getTime() - nextFlush.getTimeInMillis())
+        / rollIntervalMillis + 1) * rollIntervalMillis);
+
+    nextFlush.add(Calendar.MILLISECOND, millis);
+  }
+
+  /**
+   * Set the {@link #nextFlush} variable to the initial flush time. The initial
+   * flush will be an integer number of flush intervals past the beginning of
+   * the current hour and will have a random offset added, up to
+   * {@link #rollOffsetIntervalMillis}. The initial flush will be a time in
+   * past that can be used from which to calculate future flush times.
+   *
+   * @param now the current time
+   */
+  @VisibleForTesting
+  protected void setInitialFlushTime(Date now) {
+    // Start with the beginning of the current hour
+    nextFlush = Calendar.getInstance();
+    nextFlush.setTime(now);
+    nextFlush.set(Calendar.MILLISECOND, 0);
+    nextFlush.set(Calendar.SECOND, 0);
+    nextFlush.set(Calendar.MINUTE, 0);
+
+    // In the first round, calculate the first flush as the largest number of
+    // intervals from the beginning of the current hour that's not in the
+    // future by:
+    // 1. Subtract the beginning of the hour from the current time
+    // 2. Divide by the roll interval and round down to get the number of whole
+    //    intervals that have passed since the beginning of the hour
+    // 3. Multiply by the roll interval to get the number of millis between
+    //    the beginning of the current hour and the beginning of the current
+    //    interval.
+    int millis = (int) (((now.getTime() - nextFlush.getTimeInMillis())
+        / rollIntervalMillis) * rollIntervalMillis);
+
+    // Then add some noise to help prevent all the nodes from
+    // closing their files at the same time.
+    if (rollOffsetIntervalMillis > 0) {
+      millis += ThreadLocalRandom.current().nextLong(rollOffsetIntervalMillis);
+
+      // If the added time puts us into the future, step back one roll interval
+      // because the code to increment nextFlush to the next flush expects that
+      // nextFlush is the next flush from the previous interval.  There wasn't
+      // a previous interval, so we just fake it with the time in the past that
+      // would have been the previous interval if there had been one.
+      //
+      // It's OK if millis comes out negative.
+      while (nextFlush.getTimeInMillis() + millis > now.getTime()) {
+        millis -= rollIntervalMillis;
+      }
+    }
+
+    // Adjust the next flush time by millis to get the time of our ficticious
+    // previous next flush
+    nextFlush.add(Calendar.MILLISECOND, millis);
   }
 
   /**
-   * Create a new directory based on the current hour and a new log file in
+   * Create a new directory based on the current interval and a new log file in
    * that directory.
    *
    * @throws IOException thrown if an error occurs while creating the
@@ -451,7 +679,8 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
    * path is found.
    *
    * Once the file is open, update {@link #currentFSOutStream},
-   * {@link #currentOutStream}, and {@#link #currentFile} are set appropriately.
+   * {@link #currentOutStream}, and {@#link #currentFilePath} are set
+   * appropriately.
    *
    * @param initial the target path
    * @throws IOException thrown if the call to see if the exists fails
@@ -552,7 +781,7 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
    * instead.
    *
    * Once the file is open, update {@link #currentFSOutStream},
-   * {@link #currentOutStream}, and {@#link #currentFile} are set appropriately.
+   * {@link #currentOutStream}, and {@#link #currentFilePath}.
    *
    * @param initial the target path
    * @throws IOException thrown if the call to see the append operation fails.
@@ -615,9 +844,9 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
         currentOutStream.println();
 
         // If we don't hflush(), the data may not be written until the file is
-        // closed. The file won't be closed until the top of the hour *AND*
+        // closed. The file won't be closed until the end of the interval *AND*
         // another record is received. Calling hflush() makes sure that the data
-        // is complete at the top of the hour.
+        // is complete at the end of the interval.
         try {
           currentFSOutStream.hflush();
         } catch (IOException ex) {
@@ -668,8 +897,8 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
    * as the new exception's message with the current file name
    * ({@link #currentFilePath}) appended to it.
    *
-   * @param message the exception message. The message will have the current
-   * file name ({@link #currentFilePath}) appended to it.
+   * @param message the exception message. The message will have a colon and
+   * the current file name ({@link #currentFilePath}) appended to it.
    * @throws MetricsException thrown if there was an error and the sink isn't
    * ignoring errors
    */
@@ -687,9 +916,9 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
    * ({@link #currentFilePath}) and the Throwable's string representation
    * appended to it.
    *
-   * @param message the exception message. The message will have the current
-   * file name ({@link #currentFilePath}) and the Throwable's string
-   * representation appended to it.
+   * @param message the exception message. The message will have a colon, the
+   * current file name ({@link #currentFilePath}), and the Throwable's string
+   * representation (wrapped in square brackets) appended to it.
    * @param t the Throwable to wrap
    */
   private void throwMetricsException(String message, Throwable t) {
@@ -705,8 +934,8 @@ public class RollingFileSystemSink implements MetricsSink, Closeable {
    * new exception's message with the current file name
    * ({@link #currentFilePath}) appended to it.
    *
-   * @param message the exception message. The message will have the current
-   * file name ({@link #currentFilePath}) appended to it.
+   * @param message the exception message. The message will have a colon and
+   * the current file name ({@link #currentFilePath}) appended to it.
    */
   private void throwMetricsException(String message) {
     if (!ignoreError) {

+ 9 - 7
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/RollingFileSystemSinkTestBase.java

@@ -180,10 +180,12 @@ public class RollingFileSystemSinkTestBase {
         .add(prefix + ".sink.mysink0.source", "testsrc")
         .add(prefix + ".sink.mysink0.context", "test1")
         .add(prefix + ".sink.mysink0.ignore-error", ignoreErrors)
-        .add(prefix + ".sink.mysink0.allow-append", allowAppend);
+        .add(prefix + ".sink.mysink0.allow-append", allowAppend)
+        .add(prefix + ".sink.mysink0.roll-offset-interval-millis", 0)
+        .add(prefix + ".sink.mysink0.roll-interval", "1h");
 
     if (useSecureParams) {
-        builder.add(prefix + ".sink.mysink0.keytab-key", SINK_KEYTAB_FILE_KEY)
+      builder.add(prefix + ".sink.mysink0.keytab-key", SINK_KEYTAB_FILE_KEY)
         .add(prefix + ".sink.mysink0.principal-key", SINK_PRINCIPAL_KEY);
     }
 
@@ -210,7 +212,7 @@ public class RollingFileSystemSinkTestBase {
    */
   protected String doWriteTest(MetricsSystem ms, String path, int count)
       throws IOException, URISyntaxException {
-    final String then = DATE_FORMAT.format(new Date());
+    final String then = DATE_FORMAT.format(new Date()) + "00";
 
     MyMetrics1 mm1 = new MyMetrics1().registerWith(ms);
     new MyMetrics2().registerWith(ms);
@@ -239,7 +241,7 @@ public class RollingFileSystemSinkTestBase {
    */
   protected String readLogFile(String path, String then, int count)
       throws IOException, URISyntaxException {
-    final String now = DATE_FORMAT.format(new Date());
+    final String now = DATE_FORMAT.format(new Date()) + "00";
     final String logFile = getLogFilename();
     FileSystem fs = FileSystem.get(new URI(path), new Configuration());
     StringBuilder metrics = new StringBuilder();
@@ -426,7 +428,7 @@ public class RollingFileSystemSinkTestBase {
     Calendar now = getNowNotTopOfHour();
 
     FileSystem fs = FileSystem.get(new URI(path), new Configuration());
-    Path dir = new Path(path, DATE_FORMAT.format(now.getTime()));
+    Path dir = new Path(path, DATE_FORMAT.format(now.getTime()) + "00");
 
     fs.mkdirs(dir);
 
@@ -494,8 +496,8 @@ public class RollingFileSystemSinkTestBase {
     }
 
     assertTrue("The sink created additional unexpected log files. " + count
-        + "files were created", expected >= count);
-    assertTrue("The sink created too few log files. " + count + "files were "
+        + " files were created", expected >= count);
+    assertTrue("The sink created too few log files. " + count + " files were "
         + "created", expected <= count);
   }
 

+ 194 - 91
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSink.java

@@ -18,141 +18,244 @@
 
 package org.apache.hadoop.metrics2.sink;
 
-import org.apache.hadoop.metrics2.MetricsSystem;
+import java.util.Calendar;
+import org.apache.commons.configuration.SubsetConfiguration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.metrics2.MetricsException;
+import org.apache.hadoop.metrics2.impl.ConfigBuilder;
 
 import org.junit.Test;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
- * Test the {@link RollingFileSystemSink} class in the context of the local file
- * system.
+ * Test that the init() method picks up all the configuration settings
+ * correctly.
  */
-public class TestRollingFileSystemSink extends RollingFileSystemSinkTestBase {
-  /**
-   * Test writing logs to the local file system.
-   * @throws Exception when things break
-   */
+public class TestRollingFileSystemSink {
   @Test
-  public void testWrite() throws Exception {
-    String path = methodDir.getAbsolutePath();
-    MetricsSystem ms = initMetricsSystem(path, false, false);
+  public void testInit() {
+    ConfigBuilder builder = new ConfigBuilder();
+    SubsetConfiguration conf =
+        builder.add("sink.roll-interval", "10m")
+            .add("sink.roll-offset-interval-millis", "1")
+            .add("sink.basepath", "path")
+            .add("sink.ignore-error", "true")
+            .add("sink.allow-append", "true")
+            .add("sink.source", "src")
+            .subset("sink");
 
-    assertMetricsContents(doWriteTest(ms, path, 1));
-  }
+    RollingFileSystemSink sink = new RollingFileSystemSink();
 
-  /**
-   * Test writing logs to the local file system with the sink set to ignore
-   * errors.
-   * @throws Exception when things break
-   */
-  @Test
-  public void testSilentWrite() throws Exception {
-    String path = methodDir.getAbsolutePath();
-    MetricsSystem ms = initMetricsSystem(path, true, false);
+    sink.init(conf);
 
-    assertMetricsContents(doWriteTest(ms, path, 1));
+    assertEquals("The roll interval was not set correctly",
+        sink.rollIntervalMillis, 600000);
+    assertEquals("The roll offset interval was not set correctly",
+        sink.rollOffsetIntervalMillis, 1);
+    assertEquals("The base path was not set correctly",
+        sink.basePath, new Path("path"));
+    assertEquals("ignore-error was not set correctly",
+        sink.ignoreError, true);
+    assertEquals("allow-append was not set correctly",
+        sink.allowAppend, true);
+    assertEquals("The source was not set correctly",
+        sink.source, "src");
   }
 
   /**
-   * Test writing logs to HDFS when the log file already exists.
-   *
-   * @throws Exception when things break
+   * Test whether the initial roll interval is set correctly.
    */
   @Test
-  public void testExistingWrite() throws Exception {
-    String path = methodDir.getAbsolutePath();
+  public void testSetInitialFlushTime() {
+    RollingFileSystemSink rfsSink = new RollingFileSystemSink(1000, 0);
+    Calendar calendar = Calendar.getInstance();
 
-    assertMetricsContents(doAppendTest(path, false, false, 2));
-  }
+    calendar.set(Calendar.MILLISECOND, 0);
+    calendar.set(Calendar.SECOND, 0);
+    calendar.set(Calendar.MINUTE, 0);
+    calendar.set(Calendar.HOUR, 0);
+    calendar.set(Calendar.DAY_OF_YEAR, 1);
+    calendar.set(Calendar.YEAR, 2016);
 
-  /**
-   * Test writing logs to HDFS when the log file and the .1 log file already
-   * exist.
-   *
-   * @throws Exception when things break
-   */
-  @Test
-  public void testExistingWrite2() throws Exception {
-    String path = methodDir.getAbsolutePath();
-    MetricsSystem ms = initMetricsSystem(path, false, false);
+    assertNull("Last flush time should have been null prior to calling init()",
+        rfsSink.nextFlush);
+
+    rfsSink.setInitialFlushTime(calendar.getTime());
+
+    long diff =
+        rfsSink.nextFlush.getTimeInMillis() - calendar.getTimeInMillis();
+
+    assertEquals("The initial flush time was calculated incorrectly", 0L, diff);
+
+    calendar.set(Calendar.MILLISECOND, 10);
+    rfsSink.setInitialFlushTime(calendar.getTime());
+    diff = rfsSink.nextFlush.getTimeInMillis() - calendar.getTimeInMillis();
+
+    assertEquals("The initial flush time was calculated incorrectly",
+        -10L, diff);
+
+    calendar.set(Calendar.SECOND, 1);
+    calendar.set(Calendar.MILLISECOND, 10);
+    rfsSink.setInitialFlushTime(calendar.getTime());
+    diff = rfsSink.nextFlush.getTimeInMillis() - calendar.getTimeInMillis();
+
+    assertEquals("The initial flush time was calculated incorrectly",
+        -10L, diff);
+
+    // Try again with a random offset
+    rfsSink = new RollingFileSystemSink(1000, 100);
 
-    preCreateLogFile(path, 2);
+    assertNull("Last flush time should have been null prior to calling init()",
+        rfsSink.nextFlush);
 
-    assertMetricsContents(doWriteTest(ms, path, 3));
+    calendar.set(Calendar.MILLISECOND, 0);
+    calendar.set(Calendar.SECOND, 0);
+    rfsSink.setInitialFlushTime(calendar.getTime());
+
+    diff = rfsSink.nextFlush.getTimeInMillis() - calendar.getTimeInMillis();
+
+    assertTrue("The initial flush time was calculated incorrectly: " + diff,
+        (diff >= -1000L) && (diff < -900L));
+
+    calendar.set(Calendar.MILLISECOND, 10);
+    rfsSink.setInitialFlushTime(calendar.getTime());
+    diff = rfsSink.nextFlush.getTimeInMillis() - calendar.getTimeInMillis();
+
+    assertTrue("The initial flush time was calculated incorrectly: " + diff,
+        ((diff >= -10L) && (diff <= 0L) ||
+            ((diff > -1000L) && (diff < -910L))));
+
+    calendar.set(Calendar.SECOND, 1);
+    calendar.set(Calendar.MILLISECOND, 10);
+    rfsSink.setInitialFlushTime(calendar.getTime());
+    diff = rfsSink.nextFlush.getTimeInMillis() - calendar.getTimeInMillis();
+
+    assertTrue("The initial flush time was calculated incorrectly: " + diff,
+        ((diff >= -10L) && (diff <= 0L) ||
+            ((diff > -1000L) && (diff < -910L))));
+
+    // Now try pathological settings
+    rfsSink = new RollingFileSystemSink(1000, 1000000);
+
+    assertNull("Last flush time should have been null prior to calling init()",
+        rfsSink.nextFlush);
+
+    calendar.set(Calendar.MILLISECOND, 1);
+    calendar.set(Calendar.SECOND, 0);
+    rfsSink.setInitialFlushTime(calendar.getTime());
+
+    diff = rfsSink.nextFlush.getTimeInMillis() - calendar.getTimeInMillis();
+
+    assertTrue("The initial flush time was calculated incorrectly: " + diff,
+        (diff > -1000L) && (diff <= 0L));
   }
 
   /**
-   * Test writing logs to HDFS with ignore errors enabled when
-   * the log file already exists.
-   *
-   * @throws Exception when things break
+   * Test that the roll time updates correctly.
    */
   @Test
-  public void testSilentExistingWrite() throws Exception {
-    String path = methodDir.getAbsolutePath();
+  public void testUpdateRollTime() {
+    RollingFileSystemSink rfsSink = new RollingFileSystemSink(1000, 0);
+    Calendar calendar = Calendar.getInstance();
+
+    calendar.set(Calendar.MILLISECOND, 0);
+    calendar.set(Calendar.SECOND, 0);
+    calendar.set(Calendar.MINUTE, 0);
+    calendar.set(Calendar.HOUR, 0);
+    calendar.set(Calendar.DAY_OF_YEAR, 1);
+    calendar.set(Calendar.YEAR, 2016);
+
+    rfsSink.nextFlush = Calendar.getInstance();
+    rfsSink.nextFlush.setTime(calendar.getTime());
+    rfsSink.updateFlushTime(calendar.getTime());
+
+    assertEquals("The next roll time should have been 1 second in the future",
+        calendar.getTimeInMillis() + 1000,
+        rfsSink.nextFlush.getTimeInMillis());
+
+    rfsSink.nextFlush.setTime(calendar.getTime());
+    calendar.add(Calendar.MILLISECOND, 10);
+    rfsSink.updateFlushTime(calendar.getTime());
 
-    assertMetricsContents(doAppendTest(path, false, false, 2));
+    assertEquals("The next roll time should have been 990 ms in the future",
+        calendar.getTimeInMillis() + 990,
+        rfsSink.nextFlush.getTimeInMillis());
+
+    rfsSink.nextFlush.setTime(calendar.getTime());
+    calendar.add(Calendar.SECOND, 2);
+    calendar.add(Calendar.MILLISECOND, 10);
+    rfsSink.updateFlushTime(calendar.getTime());
+
+    assertEquals("The next roll time should have been 990 ms in the future",
+        calendar.getTimeInMillis() + 990,
+        rfsSink.nextFlush.getTimeInMillis());
   }
 
   /**
-   * Test that writing fails when the directory isn't writable.
+   * Test whether the roll interval is correctly calculated from the
+   * configuration settings.
    */
   @Test
-  public void testFailedWrite() {
-    String path = methodDir.getAbsolutePath();
-    MetricsSystem ms = initMetricsSystem(path, false, false);
+  public void testGetRollInterval() {
+    doTestGetRollInterval(1, new String[] {"m", "min", "minute", "minutes"},
+        60 * 1000L);
+    doTestGetRollInterval(1, new String[] {"h", "hr", "hour", "hours"},
+        60 * 60 * 1000L);
+    doTestGetRollInterval(1, new String[] {"d", "day", "days"},
+        24 * 60 * 60 * 1000L);
 
-    new MyMetrics1().registerWith(ms);
+    ConfigBuilder builder = new ConfigBuilder();
+    SubsetConfiguration conf =
+        builder.add("sink.roll-interval", "1").subset("sink");
+    // We can reuse the same sink evry time because we're setting the same
+    // property every time.
+    RollingFileSystemSink sink = new RollingFileSystemSink();
 
-    methodDir.setWritable(false);
-    MockSink.errored = false;
+    sink.init(conf);
 
-    try {
-      // publish the metrics
-      ms.publishMetricsNow();
+    assertEquals(3600000L, sink.getRollInterval());
 
-      assertTrue("No exception was generated while writing metrics "
-          + "even though the target directory was not writable",
-          MockSink.errored);
+    for (char c : "abcefgijklnopqrtuvwxyz".toCharArray()) {
+      builder = new ConfigBuilder();
+      conf = builder.add("sink.roll-interval", "90 " + c).subset("sink");
 
-      ms.stop();
-      ms.shutdown();
-    } finally {
-      // Make sure the dir is writable again so we can delete it at the end
-      methodDir.setWritable(true);
+      try {
+        sink.init(conf);
+        sink.getRollInterval();
+        fail("Allowed flush interval with bad units: " + c);
+      } catch (MetricsException ex) {
+        // Expected
+      }
     }
   }
 
   /**
-   * Test that writing fails silently when the directory is not writable.
+   * Test the basic unit conversions with the given unit name modifier applied.
+   *
+   * @param mod a unit name modifier
    */
-  @Test
-  public void testSilentFailedWrite() {
-    String path = methodDir.getAbsolutePath();
-    MetricsSystem ms = initMetricsSystem(path, true, false);
-
-    new MyMetrics1().registerWith(ms);
+  private void doTestGetRollInterval(int num, String[] units, long expected) {
+    RollingFileSystemSink sink = new RollingFileSystemSink();
+    ConfigBuilder builder = new ConfigBuilder();
 
-    methodDir.setWritable(false);
-    MockSink.errored = false;
+    for (String unit : units) {
+      sink.init(builder.add("sink.roll-interval", num + unit).subset("sink"));
+      assertEquals(expected, sink.getRollInterval());
 
-    try {
-      // publish the metrics
-      ms.publishMetricsNow();
+      sink.init(builder.add("sink.roll-interval",
+          num + unit.toUpperCase()).subset("sink"));
+      assertEquals(expected, sink.getRollInterval());
 
-      assertFalse("An exception was generated while writing metrics "
-          + "when the target directory was not writable, even though the "
-          + "sink is set to ignore errors",
-          MockSink.errored);
+      sink.init(builder.add("sink.roll-interval",
+          num + " " + unit).subset("sink"));
+      assertEquals(expected, sink.getRollInterval());
 
-      ms.stop();
-      ms.shutdown();
-    } finally {
-      // Make sure the dir is writable again so we can delete it at the end
-      methodDir.setWritable(true);
+      sink.init(builder.add("sink.roll-interval",
+          num + " " + unit.toUpperCase()).subset("sink"));
+      assertEquals(expected, sink.getRollInterval());
     }
   }
 }

+ 157 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithLocal.java

@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink;
+
+import org.apache.hadoop.metrics2.MetricsSystem;
+
+import org.junit.Test;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test the {@link RollingFileSystemSink} class in the context of the local file
+ * system.
+ */
+public class TestRollingFileSystemSinkWithLocal
+    extends RollingFileSystemSinkTestBase {
+  /**
+   * Test writing logs to the local file system.
+   * @throws Exception when things break
+   */
+  @Test
+  public void testWrite() throws Exception {
+    String path = methodDir.getAbsolutePath();
+    MetricsSystem ms = initMetricsSystem(path, false, false);
+
+    assertMetricsContents(doWriteTest(ms, path, 1));
+  }
+
+  /**
+   * Test writing logs to the local file system with the sink set to ignore
+   * errors.
+   * @throws Exception when things break
+   */
+  @Test
+  public void testSilentWrite() throws Exception {
+    String path = methodDir.getAbsolutePath();
+    MetricsSystem ms = initMetricsSystem(path, true, false);
+
+    assertMetricsContents(doWriteTest(ms, path, 1));
+  }
+
+  /**
+   * Test writing logs to HDFS when the log file already exists.
+   *
+   * @throws Exception when things break
+   */
+  @Test
+  public void testExistingWrite() throws Exception {
+    String path = methodDir.getAbsolutePath();
+
+    assertMetricsContents(doAppendTest(path, false, false, 2));
+  }
+
+  /**
+   * Test writing logs to HDFS when the log file and the .1 log file already
+   * exist.
+   *
+   * @throws Exception when things break
+   */
+  @Test
+  public void testExistingWrite2() throws Exception {
+    String path = methodDir.getAbsolutePath();
+    MetricsSystem ms = initMetricsSystem(path, false, false);
+
+    preCreateLogFile(path, 2);
+
+    assertMetricsContents(doWriteTest(ms, path, 3));
+  }
+
+  /**
+   * Test writing logs to HDFS with ignore errors enabled when
+   * the log file already exists.
+   *
+   * @throws Exception when things break
+   */
+  @Test
+  public void testSilentExistingWrite() throws Exception {
+    String path = methodDir.getAbsolutePath();
+
+    assertMetricsContents(doAppendTest(path, false, false, 2));
+  }
+
+  /**
+   * Test that writing fails when the directory isn't writable.
+   */
+  @Test
+  public void testFailedWrite() {
+    String path = methodDir.getAbsolutePath();
+    MetricsSystem ms = initMetricsSystem(path, false, false);
+
+    new MyMetrics1().registerWith(ms);
+
+    methodDir.setWritable(false);
+    MockSink.errored = false;
+
+    try {
+      // publish the metrics
+      ms.publishMetricsNow();
+
+      assertTrue("No exception was generated while writing metrics "
+          + "even though the target directory was not writable",
+          MockSink.errored);
+
+      ms.stop();
+      ms.shutdown();
+    } finally {
+      // Make sure the dir is writable again so we can delete it at the end
+      methodDir.setWritable(true);
+    }
+  }
+
+  /**
+   * Test that writing fails silently when the directory is not writable.
+   */
+  @Test
+  public void testSilentFailedWrite() {
+    String path = methodDir.getAbsolutePath();
+    MetricsSystem ms = initMetricsSystem(path, true, false);
+
+    new MyMetrics1().registerWith(ms);
+
+    methodDir.setWritable(false);
+    MockSink.errored = false;
+
+    try {
+      // publish the metrics
+      ms.publishMetricsNow();
+
+      assertFalse("An exception was generated while writing metrics "
+          + "when the target directory was not writable, even though the "
+          + "sink is set to ignore errors",
+          MockSink.errored);
+
+      ms.stop();
+      ms.shutdown();
+    } finally {
+      // Make sure the dir is writable again so we can delete it at the end
+      methodDir.setWritable(true);
+    }
+  }
+}

+ 17 - 9
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithHdfs.java

@@ -20,8 +20,6 @@ package org.apache.hadoop.metrics2.sink;
 
 import java.io.IOException;
 import java.net.URI;
-import org.junit.After;
-import org.junit.Before;
 import java.util.Calendar;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -31,6 +29,8 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.metrics2.MetricsException;
 import org.apache.hadoop.metrics2.MetricsSystem;
 import org.apache.hadoop.metrics2.sink.RollingFileSystemSinkTestBase.MyMetrics1;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -60,7 +60,6 @@ public class TestRollingFileSystemSinkWithHdfs
         new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
 
     // Also clear sink flags
-    RollingFileSystemSink.flushQuickly = false;
     RollingFileSystemSink.hasFlushed = false;
   }
 
@@ -256,10 +255,12 @@ public class TestRollingFileSystemSinkWithHdfs
    */
   @Test
   public void testFlushThread() throws Exception {
-    RollingFileSystemSink.flushQuickly = true;
+    // Cause the sink's flush thread to be run immediately after the second
+    // metrics log is written
+    RollingFileSystemSink.forceFlush = true;
 
     String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
-    MetricsSystem ms = initMetricsSystem(path, true, false);
+    MetricsSystem ms = initMetricsSystem(path, true, false, false);
 
     new MyMetrics1().registerWith(ms);
 
@@ -269,14 +270,21 @@ public class TestRollingFileSystemSinkWithHdfs
     // regardless.
     ms.publishMetricsNow();
 
-    // Sleep until the flusher has run
+    int count = 0;
+
+    // Sleep until the flusher has run. This should never actually need to
+    // sleep, but the sleep is here to make sure this test isn't flakey.
     while (!RollingFileSystemSink.hasFlushed) {
-      Thread.sleep(50L);
+      Thread.sleep(10L);
+
+      if (++count > 1000) {
+        fail("Flush thread did not run within 10 seconds");
+      }
     }
 
-    Calendar now = getNowNotTopOfHour();
+    Calendar now = Calendar.getInstance();
+    Path currentDir = new Path(path, DATE_FORMAT.format(now.getTime()) + "00");
     FileSystem fs = FileSystem.newInstance(new URI(path), new Configuration());
-    Path currentDir = new Path(path, DATE_FORMAT.format(now.getTime()));
     Path currentFile =
         findMostRecentLogFile(fs, new Path(currentDir, getLogFilename()));
     FileStatus status = fs.getFileStatus(currentFile);

+ 91 - 110
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/metrics2/sink/TestRollingFileSystemSinkWithSecureHdfs.java

@@ -49,8 +49,11 @@ import org.apache.hadoop.security.NullGroupsMapping;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
+import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Test;
-import static org.junit.Assert.assertTrue;
+import org.junit.Before;
+import org.junit.BeforeClass;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -66,58 +69,99 @@ public class TestRollingFileSystemSinkWithSecureHdfs
   private static String hdfsPrincipal;
   private static String hdfsKeytab;
   private static String spnegoPrincipal;
+  private MiniDFSCluster cluster = null;
+  private UserGroupInformation sink = null;
 
   /**
-   * Do a basic write test against an HDFS cluster with Kerberos enabled. We
-   * assume that if a basic write succeeds, more complex operations will also
-   * succeed.
+   * Setup the KDC for testing a secure HDFS cluster.
    *
-   * @throws Exception thrown if things break
+   * @throws Exception thrown if the KDC setup fails
    */
-  @Test
-  public void testWithSecureHDFS() throws Exception {
-    RollingFileSystemSink.flushQuickly = false;
-    RollingFileSystemSink.hasFlushed = false;
-    initKdc();
-
-    MiniDFSCluster cluster = null;
+  @BeforeClass
+  public static void initKdc() throws Exception {
+    Properties kdcConf = MiniKdc.createConf();
+    kdc = new MiniKdc(kdcConf, ROOT_TEST_DIR);
+    kdc.start();
 
-    try {
-      HdfsConfiguration conf = createSecureConfig("authentication,privacy");
+    File sinkKeytabFile = new File(ROOT_TEST_DIR, "sink.keytab");
+    sinkKeytab = sinkKeytabFile.getAbsolutePath();
+    kdc.createPrincipal(sinkKeytabFile, "sink/localhost");
+    sinkPrincipal = "sink/localhost@" + kdc.getRealm();
 
-      RollingFileSystemSink.suppliedConf = conf;
+    File hdfsKeytabFile = new File(ROOT_TEST_DIR, "hdfs.keytab");
+    hdfsKeytab = hdfsKeytabFile.getAbsolutePath();
+    kdc.createPrincipal(hdfsKeytabFile, "hdfs/localhost",
+        "HTTP/localhost");
+    hdfsPrincipal = "hdfs/localhost@" + kdc.getRealm();
+    spnegoPrincipal = "HTTP/localhost@" + kdc.getRealm();
+  }
 
-      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES)
-          .build();
+  /**
+   * Setup the mini-DFS cluster.
+   *
+   * @throws Exception thrown if the cluster setup fails
+   */
+  @Before
+  public void initCluster() throws Exception {
+    HdfsConfiguration conf = createSecureConfig("authentication,privacy");
 
-      cluster.waitActive();
+    RollingFileSystemSink.hasFlushed = false;
+    RollingFileSystemSink.suppliedConf = conf;
 
-      UserGroupInformation sink = createDirectoriesSecurely(cluster);
-      final String path =
-          "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp/test";
-      final MetricsSystem ms = initMetricsSystem(path, true, false, true);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES)
+        .build();
+    cluster.waitActive();
+    createDirectoriesSecurely();
+  }
 
-      assertMetricsContents(
-          sink.doAs(new PrivilegedExceptionAction<String>() {
-            @Override
-            public String run() throws Exception {
-              return doWriteTest(ms, path, 1);
-            }
-          }));
-    } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
+  /**
+   * Stop the mini-DFS cluster.
+   */
+  @After
+  public void stopCluster() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
 
-      shutdownKdc();
+    // Restore non-secure conf
+    UserGroupInformation.setConfiguration(new Configuration());
+    RollingFileSystemSink.suppliedConf = null;
+    RollingFileSystemSink.suppliedFilesystem = null;
+  }
 
-      // Restore non-secure conf
-      UserGroupInformation.setConfiguration(new Configuration());
-      RollingFileSystemSink.suppliedConf = null;
-      RollingFileSystemSink.suppliedFilesystem = null;
+  /**
+   * Stop the mini-KDC.
+   */
+  @AfterClass
+  public static void shutdownKdc() {
+    if (kdc != null) {
+      kdc.stop();
     }
   }
 
+  /**
+   * Do a basic write test against an HDFS cluster with Kerberos enabled. We
+   * assume that if a basic write succeeds, more complex operations will also
+   * succeed.
+   *
+   * @throws Exception thrown if things break
+   */
+  @Test
+  public void testWithSecureHDFS() throws Exception {
+    final String path =
+        "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp/test";
+    final MetricsSystem ms =
+        initMetricsSystem(path, true, false, true);
+
+    assertMetricsContents(
+        sink.doAs(new PrivilegedExceptionAction<String>() {
+          @Override
+          public String run() throws Exception {
+            return doWriteTest(ms, path, 1);
+          }
+        }));
+  }
+
   /**
    * Do a basic write test against an HDFS cluster with Kerberos enabled but
    * without the principal and keytab properties set.
@@ -126,54 +170,25 @@ public class TestRollingFileSystemSinkWithSecureHdfs
    */
   @Test
   public void testMissingPropertiesWithSecureHDFS() throws Exception {
-    RollingFileSystemSink.flushQuickly = false;
-    RollingFileSystemSink.hasFlushed = false;
-    initKdc();
-
-    MiniDFSCluster cluster = null;
-
-    try {
-      HdfsConfiguration conf = createSecureConfig("authentication,privacy");
-
-      RollingFileSystemSink.suppliedConf = conf;
-
-      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES)
-          .build();
-
-      final String path =
-          "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp/test";
+    final String path =
+        "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp/test";
 
-      createDirectoriesSecurely(cluster);
-      initMetricsSystem(path, true, false);
+    initMetricsSystem(path, true, false);
 
-      assertTrue("No exception was generated initializing the sink against a "
-          + "secure cluster even though the principal and keytab properties "
-          + "were missing", MockSink.errored);
-    } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
-
-      shutdownKdc();
-
-      // Restore non-secure conf
-      UserGroupInformation.setConfiguration(new Configuration());
-      RollingFileSystemSink.suppliedConf = null;
-      RollingFileSystemSink.suppliedFilesystem = null;
-    }
+    assertTrue("No exception was generated initializing the sink against a "
+        + "secure cluster even though the principal and keytab properties "
+        + "were missing", MockSink.errored);
   }
 
   /**
    * Create the /tmp directory as <i>hdfs</i> and /tmp/test as <i>sink</i> and
    * return the UGI for <i>sink</i>.
    *
-   * @param cluster the mini-cluster
-   * @return the UGI for <i>sink</i>
    * @throws IOException thrown if login or directory creation fails
    * @throws InterruptedException thrown if interrupted while creating a
    * file system handle
    */
-  protected UserGroupInformation createDirectoriesSecurely(final MiniDFSCluster cluster)
+  protected void createDirectoriesSecurely()
       throws IOException, InterruptedException {
     Path tmp = new Path("/tmp");
     Path test = new Path(tmp, "test");
@@ -192,9 +207,9 @@ public class TestRollingFileSystemSinkWithSecureHdfs
     fsForSuperUser.mkdirs(tmp);
     fsForSuperUser.setPermission(tmp, new FsPermission((short)0777));
 
-    UserGroupInformation sink =
-        UserGroupInformation.loginUserFromKeytabAndReturnUGI(sinkPrincipal,
+    sink = UserGroupInformation.loginUserFromKeytabAndReturnUGI(sinkPrincipal,
             sinkKeytab);
+
     FileSystem fsForSink =
         sink.doAs(new PrivilegedExceptionAction<FileSystem>() {
           @Override
@@ -205,40 +220,6 @@ public class TestRollingFileSystemSinkWithSecureHdfs
 
     fsForSink.mkdirs(test);
     RollingFileSystemSink.suppliedFilesystem = fsForSink;
-
-    return sink;
-  }
-
-  /**
-   * Setup the KDC for testing a secure HDFS cluster
-   *
-   * @throws Exception thrown if the KDC setup fails
-   */
-  public static void initKdc() throws Exception {
-    Properties kdcConf = MiniKdc.createConf();
-    kdc = new MiniKdc(kdcConf, methodDir);
-    kdc.start();
-
-    File sinkKeytabFile = new File(methodDir, "sink.keytab");
-    sinkKeytab = sinkKeytabFile.getAbsolutePath();
-    kdc.createPrincipal(sinkKeytabFile, "sink/localhost");
-    sinkPrincipal = "sink/localhost@" + kdc.getRealm();
-
-    File hdfsKeytabFile = new File(methodDir, "hdfs.keytab");
-    hdfsKeytab = hdfsKeytabFile.getAbsolutePath();
-    kdc.createPrincipal(hdfsKeytabFile, "hdfs/localhost",
-        "HTTP/localhost");
-    hdfsPrincipal = "hdfs/localhost@" + kdc.getRealm();
-    spnegoPrincipal = "HTTP/localhost@" + kdc.getRealm();
-  }
-
-  /**
-   * Stop the mini-KDC.
-   */
-  public static void shutdownKdc() {
-    if (kdc != null) {
-      kdc.stop();
-    }
   }
 
   /**