|
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
|
|
|
|
|
import java.io.InputStream;
|
|
|
import java.io.OutputStream;
|
|
|
+import java.lang.management.ManagementFactory;
|
|
|
import java.nio.file.Files;
|
|
|
import java.nio.file.Paths;
|
|
|
import java.util.Random;
|
|
@@ -131,6 +132,14 @@ import static org.mockito.Mockito.when;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
+import javax.management.AttributeNotFoundException;
|
|
|
+import javax.management.InstanceNotFoundException;
|
|
|
+import javax.management.MBeanException;
|
|
|
+import javax.management.MBeanServer;
|
|
|
+import javax.management.MalformedObjectNameException;
|
|
|
+import javax.management.ObjectName;
|
|
|
+import javax.management.ReflectionException;
|
|
|
+
|
|
|
public class TestFsDatasetImpl {
|
|
|
|
|
|
private static final Logger LOG = LoggerFactory.getLogger(
|
|
@@ -1842,7 +1851,8 @@ public class TestFsDatasetImpl {
|
|
|
*/
|
|
|
@Test
|
|
|
public void testAysncDiskServiceDeleteReplica()
|
|
|
- throws IOException, InterruptedException, TimeoutException {
|
|
|
+ throws IOException, InterruptedException, TimeoutException, MalformedObjectNameException,
|
|
|
+ ReflectionException, AttributeNotFoundException, InstanceNotFoundException, MBeanException {
|
|
|
HdfsConfiguration config = new HdfsConfiguration();
|
|
|
// Bump up replication interval.
|
|
|
config.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 10);
|
|
@@ -1896,6 +1906,17 @@ public class TestFsDatasetImpl {
|
|
|
// If this replica is deleted from memory, the client would got an ReplicaNotFoundException.
|
|
|
assertNotNull(ds.getStoredBlock(bpid, extendedBlock.getBlockId()));
|
|
|
|
|
|
+ assertEquals(1, ds.asyncDiskService.countPendingDeletions());
|
|
|
+ assertEquals(1, ds.getPendingAsyncDeletions());
|
|
|
+
|
|
|
+ // Validate PendingAsyncDeletions metrics.
|
|
|
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
|
|
|
+ ObjectName mxbeanName = new ObjectName(
|
|
|
+ "Hadoop:service=DataNode,name=FSDatasetState-" + dn.getDatanodeUuid());
|
|
|
+ long pendingAsyncDeletions = (long) mbs.getAttribute(mxbeanName,
|
|
|
+ "PendingAsyncDeletions");
|
|
|
+ assertEquals(1, pendingAsyncDeletions);
|
|
|
+
|
|
|
// Make it resume the removeReplicaFromMem method.
|
|
|
semaphore.release(1);
|
|
|
|
|
@@ -1903,6 +1924,12 @@ public class TestFsDatasetImpl {
|
|
|
GenericTestUtils.waitFor(() ->
|
|
|
ds.asyncDiskService.countPendingDeletions() == 0, 100, 1000);
|
|
|
|
|
|
+ assertEquals(0, ds.getPendingAsyncDeletions());
|
|
|
+
|
|
|
+ pendingAsyncDeletions = (long) mbs.getAttribute(mxbeanName,
|
|
|
+ "PendingAsyncDeletions");
|
|
|
+ assertEquals(0, pendingAsyncDeletions);
|
|
|
+
|
|
|
// Sleep for two heartbeat times.
|
|
|
Thread.sleep(config.getTimeDuration(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
|
|
|
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT,
|