|
@@ -31,19 +31,18 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
|
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
|
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
|
|
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
|
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
|
+import org.apache.hadoop.hdfs.tools.JMXGet;
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
import org.apache.log4j.Level;
|
|
import org.apache.log4j.Level;
|
|
import org.junit.After;
|
|
import org.junit.After;
|
|
import org.junit.Assert;
|
|
import org.junit.Assert;
|
|
import org.junit.Test;
|
|
import org.junit.Test;
|
|
|
|
|
|
-import java.io.File;
|
|
|
|
-import java.io.IOException;
|
|
|
|
|
|
+import java.io.*;
|
|
import java.util.*;
|
|
import java.util.*;
|
|
import java.util.concurrent.CountDownLatch;
|
|
import java.util.concurrent.CountDownLatch;
|
|
import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.ExecutorService;
|
|
@@ -58,6 +57,7 @@ import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
|
|
import static org.hamcrest.core.Is.is;
|
|
import static org.hamcrest.core.Is.is;
|
|
import static org.hamcrest.core.IsNot.not;
|
|
import static org.hamcrest.core.IsNot.not;
|
|
import static org.junit.Assert.assertThat;
|
|
import static org.junit.Assert.assertThat;
|
|
|
|
+import static org.junit.Assert.assertEquals;
|
|
import static org.junit.Assert.fail;
|
|
import static org.junit.Assert.fail;
|
|
|
|
|
|
public class TestLazyPersistFiles {
|
|
public class TestLazyPersistFiles {
|
|
@@ -81,14 +81,21 @@ public class TestLazyPersistFiles {
|
|
private static final int LAZY_WRITER_INTERVAL_SEC = 1;
|
|
private static final int LAZY_WRITER_INTERVAL_SEC = 1;
|
|
private static final int BUFFER_LENGTH = 4096;
|
|
private static final int BUFFER_LENGTH = 4096;
|
|
private static final int EVICTION_LOW_WATERMARK = 1;
|
|
private static final int EVICTION_LOW_WATERMARK = 1;
|
|
|
|
+ private static final String JMX_SERVICE_NAME = "DataNode";
|
|
|
|
+ private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk";
|
|
|
|
|
|
private MiniDFSCluster cluster;
|
|
private MiniDFSCluster cluster;
|
|
private DistributedFileSystem fs;
|
|
private DistributedFileSystem fs;
|
|
private DFSClient client;
|
|
private DFSClient client;
|
|
private Configuration conf;
|
|
private Configuration conf;
|
|
|
|
+ private JMXGet jmx;
|
|
|
|
|
|
@After
|
|
@After
|
|
- public void shutDownCluster() throws IOException {
|
|
|
|
|
|
+ public void shutDownCluster() throws Exception {
|
|
|
|
+
|
|
|
|
+ // Dump all RamDisk JMX metrics before shutdown the cluster
|
|
|
|
+ printRamDiskJMXMetrics();
|
|
|
|
+
|
|
if (fs != null) {
|
|
if (fs != null) {
|
|
fs.close();
|
|
fs.close();
|
|
fs = null;
|
|
fs = null;
|
|
@@ -100,6 +107,10 @@ public class TestLazyPersistFiles {
|
|
cluster.shutdown();
|
|
cluster.shutdown();
|
|
cluster = null;
|
|
cluster = null;
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ if (jmx != null) {
|
|
|
|
+ jmx = null;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
@Test (timeout=300000)
|
|
@Test (timeout=300000)
|
|
@@ -203,13 +214,15 @@ public class TestLazyPersistFiles {
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
@Test (timeout=300000)
|
|
@Test (timeout=300000)
|
|
- public void testFallbackToDiskFull() throws IOException {
|
|
|
|
|
|
+ public void testFallbackToDiskFull() throws Exception {
|
|
startUpCluster(false, 0);
|
|
startUpCluster(false, 0);
|
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
|
Path path = new Path("/" + METHOD_NAME + ".dat");
|
|
Path path = new Path("/" + METHOD_NAME + ".dat");
|
|
|
|
|
|
makeTestFile(path, BLOCK_SIZE, true);
|
|
makeTestFile(path, BLOCK_SIZE, true);
|
|
ensureFileReplicasOnStorageType(path, DEFAULT);
|
|
ensureFileReplicasOnStorageType(path, DEFAULT);
|
|
|
|
+
|
|
|
|
+ verifyRamDiskJMXMetric("RamDiskBlocksWriteFallback", 1);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -384,11 +397,10 @@ public class TestLazyPersistFiles {
|
|
|
|
|
|
/**
|
|
/**
|
|
* RamDisk eviction after lazy persist to disk.
|
|
* RamDisk eviction after lazy persist to disk.
|
|
- * @throws IOException
|
|
|
|
- * @throws InterruptedException
|
|
|
|
|
|
+ * @throws Exception
|
|
*/
|
|
*/
|
|
@Test (timeout=300000)
|
|
@Test (timeout=300000)
|
|
- public void testRamDiskEviction() throws IOException, InterruptedException {
|
|
|
|
|
|
+ public void testRamDiskEviction() throws Exception {
|
|
startUpCluster(true, 1 + EVICTION_LOW_WATERMARK);
|
|
startUpCluster(true, 1 + EVICTION_LOW_WATERMARK);
|
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
|
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
|
|
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
|
|
@@ -411,6 +423,9 @@ public class TestLazyPersistFiles {
|
|
// RAM_DISK.
|
|
// RAM_DISK.
|
|
ensureFileReplicasOnStorageType(path2, RAM_DISK);
|
|
ensureFileReplicasOnStorageType(path2, RAM_DISK);
|
|
ensureFileReplicasOnStorageType(path1, DEFAULT);
|
|
ensureFileReplicasOnStorageType(path1, DEFAULT);
|
|
|
|
+
|
|
|
|
+ verifyRamDiskJMXMetric("RamDiskBlocksEvicted", 1);
|
|
|
|
+ verifyRamDiskJMXMetric("RamDiskBlocksEvictedWithoutRead", 1);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -454,7 +469,7 @@ public class TestLazyPersistFiles {
|
|
*/
|
|
*/
|
|
@Test (timeout=300000)
|
|
@Test (timeout=300000)
|
|
public void testRamDiskEvictionIsLru()
|
|
public void testRamDiskEvictionIsLru()
|
|
- throws IOException, InterruptedException {
|
|
|
|
|
|
+ throws Exception {
|
|
final int NUM_PATHS = 5;
|
|
final int NUM_PATHS = 5;
|
|
startUpCluster(true, NUM_PATHS + EVICTION_LOW_WATERMARK);
|
|
startUpCluster(true, NUM_PATHS + EVICTION_LOW_WATERMARK);
|
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
|
@@ -499,6 +514,14 @@ public class TestLazyPersistFiles {
|
|
ensureFileReplicasOnStorageType(paths[indexes.get(j)], RAM_DISK);
|
|
ensureFileReplicasOnStorageType(paths[indexes.get(j)], RAM_DISK);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ verifyRamDiskJMXMetric("RamDiskBlocksWrite", NUM_PATHS * 2);
|
|
|
|
+ verifyRamDiskJMXMetric("RamDiskBlocksWriteFallback", 0);
|
|
|
|
+ verifyRamDiskJMXMetric("RamDiskBytesWrite", BLOCK_SIZE * NUM_PATHS * 2);
|
|
|
|
+ verifyRamDiskJMXMetric("RamDiskBlocksReadHits", NUM_PATHS);
|
|
|
|
+ verifyRamDiskJMXMetric("RamDiskBlocksEvicted", NUM_PATHS);
|
|
|
|
+ verifyRamDiskJMXMetric("RamDiskBlocksEvictedWithoutRead", 0);
|
|
|
|
+ verifyRamDiskJMXMetric("RamDiskBlocksDeletedBeforeLazyPersisted", 0);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -506,9 +529,9 @@ public class TestLazyPersistFiles {
|
|
* Memory is freed up and file is gone.
|
|
* Memory is freed up and file is gone.
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
- @Test (timeout=300000)
|
|
|
|
|
|
+ @Test // (timeout=300000)
|
|
public void testDeleteBeforePersist()
|
|
public void testDeleteBeforePersist()
|
|
- throws IOException, InterruptedException {
|
|
|
|
|
|
+ throws Exception {
|
|
startUpCluster(true, -1);
|
|
startUpCluster(true, -1);
|
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
|
FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
|
|
FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
|
|
@@ -523,6 +546,8 @@ public class TestLazyPersistFiles {
|
|
Assert.assertFalse(fs.exists(path));
|
|
Assert.assertFalse(fs.exists(path));
|
|
|
|
|
|
assertThat(verifyDeletedBlocks(locatedBlocks), is(true));
|
|
assertThat(verifyDeletedBlocks(locatedBlocks), is(true));
|
|
|
|
+
|
|
|
|
+ verifyRamDiskJMXMetric("RamDiskBlocksDeletedBeforeLazyPersisted", 1);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -533,7 +558,7 @@ public class TestLazyPersistFiles {
|
|
*/
|
|
*/
|
|
@Test (timeout=300000)
|
|
@Test (timeout=300000)
|
|
public void testDeleteAfterPersist()
|
|
public void testDeleteAfterPersist()
|
|
- throws IOException, InterruptedException {
|
|
|
|
|
|
+ throws Exception {
|
|
startUpCluster(true, -1);
|
|
startUpCluster(true, -1);
|
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
|
Path path = new Path("/" + METHOD_NAME + ".dat");
|
|
Path path = new Path("/" + METHOD_NAME + ".dat");
|
|
@@ -548,9 +573,10 @@ public class TestLazyPersistFiles {
|
|
client.delete(path.toString(), false);
|
|
client.delete(path.toString(), false);
|
|
Assert.assertFalse(fs.exists(path));
|
|
Assert.assertFalse(fs.exists(path));
|
|
|
|
|
|
- triggerBlockReport();
|
|
|
|
-
|
|
|
|
assertThat(verifyDeletedBlocks(locatedBlocks), is(true));
|
|
assertThat(verifyDeletedBlocks(locatedBlocks), is(true));
|
|
|
|
+
|
|
|
|
+ verifyRamDiskJMXMetric("RamDiskBlocksLazyPersisted", 1);
|
|
|
|
+ verifyRamDiskJMXMetric("RamDiskBytesLazyPersisted", BLOCK_SIZE);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -760,6 +786,11 @@ public class TestLazyPersistFiles {
|
|
.build();
|
|
.build();
|
|
fs = cluster.getFileSystem();
|
|
fs = cluster.getFileSystem();
|
|
client = fs.getClient();
|
|
client = fs.getClient();
|
|
|
|
+ try {
|
|
|
|
+ jmx = initJMX();
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ fail("Failed initialize JMX for testing: " + e);
|
|
|
|
+ }
|
|
LOG.info("Cluster startup complete");
|
|
LOG.info("Cluster startup complete");
|
|
}
|
|
}
|
|
|
|
|
|
@@ -929,4 +960,25 @@ public class TestLazyPersistFiles {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ JMXGet initJMX() throws Exception
|
|
|
|
+ {
|
|
|
|
+ JMXGet jmx = new JMXGet();
|
|
|
|
+ jmx.setService(JMX_SERVICE_NAME);
|
|
|
|
+ jmx.init();
|
|
|
|
+ return jmx;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ void printRamDiskJMXMetrics() {
|
|
|
|
+ try {
|
|
|
|
+ jmx.printAllMatchedAttributes(JMX_RAM_DISK_METRICS_PATTERN);
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ e.printStackTrace();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ void verifyRamDiskJMXMetric(String metricName, long expectedValue)
|
|
|
|
+ throws Exception {
|
|
|
|
+ assertEquals(expectedValue, Integer.parseInt(jmx.getValue(metricName)));
|
|
|
|
+ }
|
|
}
|
|
}
|