|
@@ -27,6 +27,7 @@ import static org.junit.Assert.*;
|
|
import java.io.Closeable;
|
|
import java.io.Closeable;
|
|
import java.io.FileNotFoundException;
|
|
import java.io.FileNotFoundException;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
|
|
+import java.io.InputStream;
|
|
import java.lang.management.ManagementFactory;
|
|
import java.lang.management.ManagementFactory;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeoutException;
|
|
import java.util.concurrent.TimeoutException;
|
|
@@ -419,6 +420,57 @@ public class TestDataNodeMetrics {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Test
|
|
|
|
+ public void testDataNodeMXBeanActiveThreadCount() throws Exception {
|
|
|
|
+ Configuration conf = new Configuration();
|
|
|
|
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
|
|
|
+ FileSystem fs = cluster.getFileSystem();
|
|
|
|
+ Path p = new Path("/testfile");
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ List<DataNode> datanodes = cluster.getDataNodes();
|
|
|
|
+ assertEquals(1, datanodes.size());
|
|
|
|
+ DataNode datanode = datanodes.get(0);
|
|
|
|
+
|
|
|
|
+ // create a xceiver thread for write
|
|
|
|
+ FSDataOutputStream os = fs.create(p);
|
|
|
|
+ for (int i = 0; i < 1024; i++) {
|
|
|
|
+ os.write("testdatastr".getBytes());
|
|
|
|
+ }
|
|
|
|
+ os.hsync();
|
|
|
|
+ // create a xceiver thread for read
|
|
|
|
+ InputStream is = fs.open(p);
|
|
|
|
+ is.read(new byte[16], 0, 4);
|
|
|
|
+
|
|
|
|
+ int threadCount = datanode.threadGroup.activeCount();
|
|
|
|
+ assertTrue(threadCount > 0);
|
|
|
|
+ Thread[] threads = new Thread[threadCount];
|
|
|
|
+ datanode.threadGroup.enumerate(threads);
|
|
|
|
+ int xceiverCount = 0;
|
|
|
|
+ int responderCount = 0;
|
|
|
|
+ int recoveryWorkerCount = 0;
|
|
|
|
+ for (Thread t : threads) {
|
|
|
|
+ if (t.getName().contains("DataXceiver for client")) {
|
|
|
|
+ xceiverCount++;
|
|
|
|
+ } else if (t.getName().contains("PacketResponder")) {
|
|
|
|
+ responderCount++;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ assertEquals(2, xceiverCount);
|
|
|
|
+ assertEquals(1, responderCount);
|
|
|
|
+ assertEquals(0, recoveryWorkerCount); //not easy to produce
|
|
|
|
+ assertEquals(xceiverCount, datanode.getXceiverCount());
|
|
|
|
+ assertEquals(xceiverCount + responderCount + recoveryWorkerCount,
|
|
|
|
+ datanode.getActiveTransferThreadCount());
|
|
|
|
+
|
|
|
|
+ is.close();
|
|
|
|
+ } finally {
|
|
|
|
+ if (cluster != null) {
|
|
|
|
+ cluster.shutdown();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
@Test
|
|
@Test
|
|
public void testDNShouldNotDeleteBlockONTooManyOpenFiles()
|
|
public void testDNShouldNotDeleteBlockONTooManyOpenFiles()
|
|
throws Exception {
|
|
throws Exception {
|