|
@@ -0,0 +1,287 @@
|
|
|
+/*
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+
|
|
|
+package org.apache.hadoop.metrics2.sink;
|
|
|
+
|
|
|
+import java.io.IOException;
|
|
|
+import java.net.URI;
|
|
|
+import java.util.Calendar;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FileStatus;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
+import org.apache.hadoop.metrics2.MetricsSystem;
|
|
|
+import org.apache.hadoop.metrics2.sink.RollingFileSystemSinkTestBase.MyMetrics1;
|
|
|
+import org.junit.After;
|
|
|
+import org.junit.Test;
|
|
|
+import static org.junit.Assert.assertFalse;
|
|
|
+import static org.junit.Assert.assertTrue;
|
|
|
+import org.junit.Before;
|
|
|
+
|
|
|
+/**
|
|
|
+ * Test the {@link RollingFileSystemSink} class in the context of HDFS.
|
|
|
+ */
|
|
|
+public class TestRollingFileSystemSinkWithHdfs
|
|
|
+ extends RollingFileSystemSinkTestBase {
|
|
|
+ private static final int NUM_DATANODES = 4;
|
|
|
+ private MiniDFSCluster cluster;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Create a {@link MiniDFSCluster} instance with four nodes. The
|
|
|
+ * node count is required to allow append to function. Also clear the
|
|
|
+ * sink's test flags.
|
|
|
+ *
|
|
|
+ * @throws IOException thrown if cluster creation fails
|
|
|
+ */
|
|
|
+ @Before
|
|
|
+ public void setupHdfs() throws IOException {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+
|
|
|
+ // It appears that since HDFS-265, append is always enabled.
|
|
|
+ cluster =
|
|
|
+ new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
|
|
|
+
|
|
|
+ // Also clear sink flags
|
|
|
+ RollingFileSystemSink.isTest = false;
|
|
|
+ RollingFileSystemSink.hasFlushed = false;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Stop the {@link MiniDFSCluster}.
|
|
|
+ */
|
|
|
+ @After
|
|
|
+ public void shutdownHdfs() {
|
|
|
+ if (cluster != null) {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test writing logs to HDFS.
|
|
|
+ *
|
|
|
+ * @throws Exception thrown when things break
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testWrite() throws Exception {
|
|
|
+ String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
|
|
|
+ MetricsSystem ms = initMetricsSystem(path, false, true);
|
|
|
+
|
|
|
+ assertMetricsContents(doWriteTest(ms, path, 1));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test writing logs to HDFS if append is enabled and the log file already
|
|
|
+ * exists.
|
|
|
+ *
|
|
|
+ * @throws Exception thrown when things break
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testAppend() throws Exception {
|
|
|
+ String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
|
|
|
+
|
|
|
+ assertExtraContents(doAppendTest(path, false, true, 1));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test writing logs to HDFS if append is enabled, the log file already
|
|
|
+ * exists, and the sink is set to ignore errors.
|
|
|
+ *
|
|
|
+ * @throws Exception thrown when things break
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testSilentAppend() throws Exception {
|
|
|
+ String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
|
|
|
+
|
|
|
+ assertExtraContents(doAppendTest(path, false, true, 1));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test writing logs to HDFS without append enabled, when the log file already
|
|
|
+ * exists.
|
|
|
+ *
|
|
|
+ * @throws Exception thrown when things break
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testNoAppend() throws Exception {
|
|
|
+ String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
|
|
|
+
|
|
|
+ assertMetricsContents(doAppendTest(path, false, false, 2));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test writing logs to HDFS without append enabled, with ignore errors
|
|
|
+ * enabled, and when the log file already exists.
|
|
|
+ *
|
|
|
+ * @throws Exception thrown when things break
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testSilentOverwrite() throws Exception {
|
|
|
+ String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
|
|
|
+
|
|
|
+ assertMetricsContents(doAppendTest(path, true, false, 2));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test that writing to HDFS fails when HDFS is unavailable.
|
|
|
+ *
|
|
|
+ * @throws IOException thrown when reading or writing log files
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testFailedWrite() throws IOException {
|
|
|
+ final String path =
|
|
|
+ "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
|
|
|
+ MetricsSystem ms = initMetricsSystem(path, false, false);
|
|
|
+
|
|
|
+ new MyMetrics1().registerWith(ms);
|
|
|
+
|
|
|
+ shutdownHdfs();
|
|
|
+ ErrorSink.errored = false;
|
|
|
+
|
|
|
+ ms.publishMetricsNow(); // publish the metrics
|
|
|
+
|
|
|
+ assertTrue("No exception was generated while writing metrics "
|
|
|
+ + "even though HDFS was unavailable", ErrorSink.errored);
|
|
|
+
|
|
|
+ ms.stop();
|
|
|
+ ms.shutdown();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test that closing a file in HDFS fails when HDFS is unavailable.
|
|
|
+ *
|
|
|
+ * @throws IOException thrown when reading or writing log files
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testFailedClose() throws IOException {
|
|
|
+ final String path =
|
|
|
+ "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
|
|
|
+ MetricsSystem ms = initMetricsSystem(path, false, false);
|
|
|
+
|
|
|
+ new MyMetrics1().registerWith(ms);
|
|
|
+
|
|
|
+ ms.publishMetricsNow(); // publish the metrics
|
|
|
+
|
|
|
+ shutdownHdfs();
|
|
|
+ ErrorSink.errored = false;
|
|
|
+
|
|
|
+ ms.stop();
|
|
|
+
|
|
|
+ assertTrue("No exception was generated while stopping sink "
|
|
|
+ + "even though HDFS was unavailable", ErrorSink.errored);
|
|
|
+
|
|
|
+ ms.shutdown();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test that writing to HDFS fails silently when HDFS is unavailable.
|
|
|
+ *
|
|
|
+ * @throws IOException thrown when reading or writing log files
|
|
|
+ * @throws java.lang.InterruptedException thrown if interrupted
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testSilentFailedWrite() throws IOException, InterruptedException {
|
|
|
+ final String path =
|
|
|
+ "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
|
|
|
+ MetricsSystem ms = initMetricsSystem(path, true, false);
|
|
|
+
|
|
|
+ new MyMetrics1().registerWith(ms);
|
|
|
+
|
|
|
+ shutdownHdfs();
|
|
|
+ ErrorSink.errored = false;
|
|
|
+
|
|
|
+ ms.publishMetricsNow(); // publish the metrics
|
|
|
+
|
|
|
+ assertFalse("An exception was generated writing metrics "
|
|
|
+ + "while HDFS was unavailable, even though the sink is set to "
|
|
|
+ + "ignore errors", ErrorSink.errored);
|
|
|
+
|
|
|
+ ms.stop();
|
|
|
+ ms.shutdown();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test that closing a file in HDFS silently fails when HDFS is unavailable.
|
|
|
+ *
|
|
|
+ * @throws IOException thrown when reading or writing log files
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testSilentFailedClose() throws IOException {
|
|
|
+ final String path =
|
|
|
+ "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
|
|
|
+ MetricsSystem ms = initMetricsSystem(path, true, false);
|
|
|
+
|
|
|
+ new MyMetrics1().registerWith(ms);
|
|
|
+
|
|
|
+ ms.publishMetricsNow(); // publish the metrics
|
|
|
+
|
|
|
+ shutdownHdfs();
|
|
|
+ ErrorSink.errored = false;
|
|
|
+
|
|
|
+ ms.stop();
|
|
|
+
|
|
|
+ assertFalse("An exception was generated stopping sink "
|
|
|
+ + "while HDFS was unavailable, even though the sink is set to "
|
|
|
+ + "ignore errors", ErrorSink.errored);
|
|
|
+
|
|
|
+ ms.shutdown();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This test specifically checks whether the flusher thread is automatically
|
|
|
+ * flushing the files. It unfortunately can only test with the alternative
|
|
|
+ * flushing schedule (because of test timing), but it's better than nothing.
|
|
|
+ *
|
|
|
+ * @throws Exception thrown if something breaks
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testFlushThread() throws Exception {
|
|
|
+ RollingFileSystemSink.isTest = true;
|
|
|
+ RollingFileSystemSink.hasFlushed = false;
|
|
|
+
|
|
|
+ String path = "hdfs://" + cluster.getNameNode().getHostAndPort() + "/tmp";
|
|
|
+ MetricsSystem ms = initMetricsSystem(path, true, false);
|
|
|
+
|
|
|
+ new MyMetrics1().registerWith(ms);
|
|
|
+
|
|
|
+ // Publish the metrics
|
|
|
+ ms.publishMetricsNow();
|
|
|
+ // Pubish again because the first write seems to get properly flushed
|
|
|
+ // regardless.
|
|
|
+ ms.publishMetricsNow();
|
|
|
+
|
|
|
+ // Sleep until the flusher has run
|
|
|
+ while (!RollingFileSystemSink.hasFlushed) {
|
|
|
+ Thread.sleep(50L);
|
|
|
+ }
|
|
|
+
|
|
|
+ Calendar now = getNowNotTopOfHour();
|
|
|
+ FileSystem fs = FileSystem.newInstance(new URI(path), new Configuration());
|
|
|
+ Path currentDir = new Path(path, DATE_FORMAT.format(now.getTime()));
|
|
|
+ Path currentFile =
|
|
|
+ findMostRecentLogFile(fs, new Path(currentDir, getLogFilename()));
|
|
|
+ FileStatus status = fs.getFileStatus(currentFile);
|
|
|
+
|
|
|
+ // Each metrics record is 118+ bytes, depending on hostname
|
|
|
+ assertTrue("The flusher thread didn't flush the log contents. Expected "
|
|
|
+ + "at least 236 bytes in the log file, but got " + status.getLen(),
|
|
|
+ status.getLen() >= 236);
|
|
|
+
|
|
|
+ ms.stop();
|
|
|
+ }
|
|
|
+}
|