|
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
+import org.apache.hadoop.metrics2.MetricsException;
|
|
|
import org.apache.hadoop.metrics2.MetricsSystem;
|
|
|
import org.apache.hadoop.metrics2.sink.RollingFileSystemSinkTestBase.MyMetrics1;
|
|
|
import org.junit.After;
|
|
@@ -108,7 +109,7 @@ public class TestRollingFileSystemSinkWithHdfs
|
|
|
public void testSilentAppend() throws Exception {
|
|
|
String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
|
|
|
|
|
|
- assertExtraContents(doAppendTest(path, false, true, 1));
|
|
|
+ assertExtraContents(doAppendTest(path, true, true, 1));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -158,8 +159,11 @@ public class TestRollingFileSystemSinkWithHdfs
|
|
|
assertTrue("No exception was generated while writing metrics "
|
|
|
+ "even though HDFS was unavailable", MockSink.errored);
|
|
|
|
|
|
- ms.stop();
|
|
|
- ms.shutdown();
|
|
|
+ try {
|
|
|
+ ms.stop();
|
|
|
+ } finally {
|
|
|
+ ms.shutdown();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -180,12 +184,16 @@ public class TestRollingFileSystemSinkWithHdfs
|
|
|
shutdownHdfs();
|
|
|
MockSink.errored = false;
|
|
|
|
|
|
- ms.stop();
|
|
|
-
|
|
|
- assertTrue("No exception was generated while stopping sink "
|
|
|
- + "even though HDFS was unavailable", MockSink.errored);
|
|
|
+ try {
|
|
|
+ ms.stop();
|
|
|
|
|
|
- ms.shutdown();
|
|
|
+ assertTrue("No exception was generated while stopping sink "
|
|
|
+ + "even though HDFS was unavailable", MockSink.errored);
|
|
|
+ } catch (MetricsException ex) {
|
|
|
+ // Expected
|
|
|
+ } finally {
|
|
|
+ ms.shutdown();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -211,8 +219,11 @@ public class TestRollingFileSystemSinkWithHdfs
|
|
|
+ "while HDFS was unavailable, even though the sink is set to "
|
|
|
+ "ignore errors", MockSink.errored);
|
|
|
|
|
|
- ms.stop();
|
|
|
- ms.shutdown();
|
|
|
+ try {
|
|
|
+ ms.stop();
|
|
|
+ } finally {
|
|
|
+ ms.shutdown();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -233,13 +244,15 @@ public class TestRollingFileSystemSinkWithHdfs
|
|
|
shutdownHdfs();
|
|
|
MockSink.errored = false;
|
|
|
|
|
|
- ms.stop();
|
|
|
+ try {
|
|
|
+ ms.stop();
|
|
|
|
|
|
- assertFalse("An exception was generated stopping sink "
|
|
|
- + "while HDFS was unavailable, even though the sink is set to "
|
|
|
- + "ignore errors", MockSink.errored);
|
|
|
-
|
|
|
- ms.shutdown();
|
|
|
+ assertFalse("An exception was generated stopping sink "
|
|
|
+ + "while HDFS was unavailable, even though the sink is set to "
|
|
|
+ + "ignore errors", MockSink.errored);
|
|
|
+ } finally {
|
|
|
+ ms.shutdown();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -268,29 +281,37 @@ public class TestRollingFileSystemSinkWithHdfs
|
|
|
|
|
|
int count = 0;
|
|
|
|
|
|
- // Sleep until the flusher has run. This should never actually need to
|
|
|
- // sleep, but the sleep is here to make sure this test isn't flakey.
|
|
|
- while (!RollingFileSystemSink.hasFlushed) {
|
|
|
- Thread.sleep(10L);
|
|
|
+ try {
|
|
|
+ // Sleep until the flusher has run. This should never actually need to
|
|
|
+ // sleep, but the sleep is here to make sure this test isn't flakey.
|
|
|
+ while (!RollingFileSystemSink.hasFlushed) {
|
|
|
+ Thread.sleep(10L);
|
|
|
|
|
|
- if (++count > 1000) {
|
|
|
- fail("Flush thread did not run within 10 seconds");
|
|
|
+ if (++count > 1000) {
|
|
|
+ fail("Flush thread did not run within 10 seconds");
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
- Calendar now = Calendar.getInstance();
|
|
|
- Path currentDir = new Path(path, DATE_FORMAT.format(now.getTime()) + "00");
|
|
|
- FileSystem fs = FileSystem.newInstance(new URI(path), new Configuration());
|
|
|
- Path currentFile =
|
|
|
- findMostRecentLogFile(fs, new Path(currentDir, getLogFilename()));
|
|
|
- FileStatus status = fs.getFileStatus(currentFile);
|
|
|
|
|
|
- // Each metrics record is 118+ bytes, depending on hostname
|
|
|
- assertTrue("The flusher thread didn't flush the log contents. Expected "
|
|
|
- + "at least 236 bytes in the log file, but got " + status.getLen(),
|
|
|
- status.getLen() >= 236);
|
|
|
-
|
|
|
- ms.stop();
|
|
|
+ Calendar now = Calendar.getInstance();
|
|
|
+ Path currentDir = new Path(path, DATE_FORMAT.format(now.getTime()) + "00");
|
|
|
+ FileSystem fs = FileSystem.newInstance(new URI(path), new Configuration());
|
|
|
+ Path currentFile =
|
|
|
+ findMostRecentLogFile(fs, new Path(currentDir, getLogFilename()));
|
|
|
+ FileStatus status = fs.getFileStatus(currentFile);
|
|
|
+
|
|
|
+ // Each metrics record is 118+ bytes, depending on hostname
|
|
|
+ assertTrue("The flusher thread didn't flush the log contents. Expected "
|
|
|
+ + "at least 236 bytes in the log file, but got " + status.getLen(),
|
|
|
+ status.getLen() >= 236);
|
|
|
+ } finally {
|
|
|
+ RollingFileSystemSink.forceFlush = false;
|
|
|
+
|
|
|
+ try {
|
|
|
+ ms.stop();
|
|
|
+ } finally {
|
|
|
+ ms.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|