|
@@ -31,12 +31,17 @@ import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.util.Queue;
|
|
|
+import java.util.concurrent.LinkedBlockingQueue;
|
|
|
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT;
|
|
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_DEAD_NODE_QUEUE_MAX_KEY;
|
|
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY;
|
|
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY;
|
|
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY;
|
|
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
|
|
|
/**
|
|
@@ -53,9 +58,15 @@ public class TestDeadNodeDetection {
|
|
|
conf = new HdfsConfiguration();
|
|
|
conf.setBoolean(DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY, true);
|
|
|
conf.setLong(
|
|
|
- DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY, 1000);
|
|
|
+ DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY,
|
|
|
+ 1000);
|
|
|
conf.setLong(
|
|
|
- DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY, 100);
|
|
|
+ DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY,
|
|
|
+ 100);
|
|
|
+ conf.setLong(
|
|
|
+ DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY,
|
|
|
+ 1000);
|
|
|
+ conf.setInt(DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY, 0);
|
|
|
}
|
|
|
|
|
|
@After
|
|
@@ -67,6 +78,7 @@ public class TestDeadNodeDetection {
|
|
|
|
|
|
@Test
|
|
|
public void testDeadNodeDetectionInBackground() throws Exception {
|
|
|
+ conf.set(DFS_CLIENT_CONTEXT, "testDeadNodeDetectionInBackground");
|
|
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
|
|
cluster.waitActive();
|
|
|
|
|
@@ -102,7 +114,10 @@ public class TestDeadNodeDetection {
|
|
|
} catch (BlockMissingException e) {
|
|
|
}
|
|
|
|
|
|
- waitForDeadNode(dfsClient, 3);
|
|
|
+ DefaultCoordination defaultCoordination = new DefaultCoordination();
|
|
|
+ defaultCoordination.startWaitForDeadNodeThread(dfsClient, 3);
|
|
|
+ defaultCoordination.sync();
|
|
|
+
|
|
|
assertEquals(3, dfsClient.getDeadNodes(din).size());
|
|
|
assertEquals(3, dfsClient.getClientContext().getDeadNodeDetector()
|
|
|
.clearAndGetDetectedDeadNodes().size());
|
|
@@ -143,6 +158,10 @@ public class TestDeadNodeDetection {
|
|
|
|
|
|
din2 = (DFSInputStream) in2.getWrappedStream();
|
|
|
dfsClient2 = din2.getDFSClient();
|
|
|
+
|
|
|
+ DefaultCoordination defaultCoordination = new DefaultCoordination();
|
|
|
+ defaultCoordination.startWaitForDeadNodeThread(dfsClient2, 1);
|
|
|
+ defaultCoordination.sync();
|
|
|
assertEquals(dfsClient1.toString(), dfsClient2.toString());
|
|
|
assertEquals(1, dfsClient1.getDeadNodes(din1).size());
|
|
|
assertEquals(1, dfsClient2.getDeadNodes(din2).size());
|
|
@@ -173,9 +192,13 @@ public class TestDeadNodeDetection {
|
|
|
|
|
|
@Test
|
|
|
public void testDeadNodeDetectionDeadNodeRecovery() throws Exception {
|
|
|
+ // prevent interrupt deadNodeDetectorThr in cluster.waitActive()
|
|
|
+ DFSClient.setDisabledStopDeadNodeDetectorThreadForTest(true);
|
|
|
+ conf.set(DFS_CLIENT_CONTEXT, "testDeadNodeDetectionDeadNodeRecovery");
|
|
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
|
|
cluster.waitActive();
|
|
|
|
|
|
+ DFSClient.setDisabledStopDeadNodeDetectorThreadForTest(false);
|
|
|
FileSystem fs = cluster.getFileSystem();
|
|
|
Path filePath = new Path("/testDeadNodeDetectionDeadNodeRecovery");
|
|
|
createFile(fs, filePath);
|
|
@@ -193,14 +216,18 @@ public class TestDeadNodeDetection {
|
|
|
in.read();
|
|
|
} catch (BlockMissingException e) {
|
|
|
}
|
|
|
-
|
|
|
- waitForDeadNode(dfsClient, 3);
|
|
|
+ DefaultCoordination defaultCoordination = new DefaultCoordination();
|
|
|
+ defaultCoordination.startWaitForDeadNodeThread(dfsClient, 3);
|
|
|
+ defaultCoordination.sync();
|
|
|
assertEquals(3, dfsClient.getDeadNodes(din).size());
|
|
|
assertEquals(3, dfsClient.getClientContext().getDeadNodeDetector()
|
|
|
.clearAndGetDetectedDeadNodes().size());
|
|
|
|
|
|
cluster.restartDataNode(one, true);
|
|
|
- waitForDeadNode(dfsClient, 2);
|
|
|
+
|
|
|
+ defaultCoordination = new DefaultCoordination();
|
|
|
+ defaultCoordination.startWaitForDeadNodeThread(dfsClient, 2);
|
|
|
+ defaultCoordination.sync();
|
|
|
assertEquals(2, dfsClient.getDeadNodes(din).size());
|
|
|
assertEquals(2, dfsClient.getClientContext().getDeadNodeDetector()
|
|
|
.clearAndGetDetectedDeadNodes().size());
|
|
@@ -250,7 +277,7 @@ public class TestDeadNodeDetection {
|
|
|
@Test
|
|
|
public void testDeadNodeDetectionSuspectNode() throws Exception {
|
|
|
conf.setInt(DFS_CLIENT_DEAD_NODE_DETECTION_SUSPECT_NODE_QUEUE_MAX_KEY, 1);
|
|
|
- DeadNodeDetector.disabledProbeThreadForTest();
|
|
|
+ DeadNodeDetector.setDisabledProbeThreadForTest(true);
|
|
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
|
|
cluster.waitActive();
|
|
|
|
|
@@ -288,6 +315,8 @@ public class TestDeadNodeDetection {
|
|
|
assertEquals(0, dfsClient.getDeadNodes(din).size());
|
|
|
assertEquals(0, dfsClient.getClientContext().getDeadNodeDetector()
|
|
|
.clearAndGetDetectedDeadNodes().size());
|
|
|
+ // reset disabledProbeThreadForTest
|
|
|
+ DeadNodeDetector.setDisabledProbeThreadForTest(false);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -317,13 +346,13 @@ public class TestDeadNodeDetection {
|
|
|
fs.delete(filePath, true);
|
|
|
}
|
|
|
|
|
|
- private void waitForDeadNode(DFSClient dfsClient, int size) throws Exception {
|
|
|
+ private void waitForSuspectNode(DFSClient dfsClient) throws Exception {
|
|
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
@Override
|
|
|
public Boolean get() {
|
|
|
try {
|
|
|
if (dfsClient.getClientContext().getDeadNodeDetector()
|
|
|
- .clearAndGetDetectedDeadNodes().size() == size) {
|
|
|
+ .getSuspectNodesProbeQueue().size() > 0) {
|
|
|
return true;
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
@@ -332,24 +361,41 @@ public class TestDeadNodeDetection {
|
|
|
|
|
|
return false;
|
|
|
}
|
|
|
- }, 5000, 100000);
|
|
|
+ }, 500, 5000);
|
|
|
}
|
|
|
|
|
|
- private void waitForSuspectNode(DFSClient dfsClient) throws Exception {
|
|
|
- GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
- @Override
|
|
|
- public Boolean get() {
|
|
|
+ class DefaultCoordination {
|
|
|
+ private Queue<Object> queue = new LinkedBlockingQueue<Object>(1);
|
|
|
+
|
|
|
+ public boolean addToQueue() {
|
|
|
+ return queue.offer(new Object());
|
|
|
+ }
|
|
|
+
|
|
|
+ public Object removeFromQueue() {
|
|
|
+ return queue.poll();
|
|
|
+ }
|
|
|
+
|
|
|
+ public void sync() {
|
|
|
+ while (removeFromQueue() == null) {
|
|
|
try {
|
|
|
- if (dfsClient.getClientContext().getDeadNodeDetector()
|
|
|
- .getSuspectNodesProbeQueue().size() > 0) {
|
|
|
- return true;
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- // Ignore the exception
|
|
|
+ Thread.sleep(1000);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
}
|
|
|
-
|
|
|
- return false;
|
|
|
}
|
|
|
- }, 5000, 100000);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void startWaitForDeadNodeThread(DFSClient dfsClient, int size) {
|
|
|
+ new Thread(() -> {
|
|
|
+ DeadNodeDetector deadNodeDetector =
|
|
|
+ dfsClient.getClientContext().getDeadNodeDetector();
|
|
|
+ while (deadNodeDetector.clearAndGetDetectedDeadNodes().size() != size) {
|
|
|
+ try {
|
|
|
+ Thread.sleep(1000);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ }
|
|
|
+ }
|
|
|
+ addToQueue();
|
|
|
+ }).start();
|
|
|
+ }
|
|
|
}
|
|
|
}
|