|
@@ -18,7 +18,11 @@
|
|
|
package org.apache.hadoop.hdfs.tools;
|
|
|
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
|
|
|
|
|
|
+import com.google.common.base.Supplier;
|
|
|
import com.google.common.collect.Lists;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
@@ -31,6 +35,7 @@ import org.apache.hadoop.hdfs.server.common.Storage;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
+import org.apache.hadoop.test.GenericTestUtils;
|
|
|
import org.junit.After;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
@@ -42,6 +47,7 @@ import java.io.PrintStream;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
import java.util.Scanner;
|
|
|
+import java.util.concurrent.TimeoutException;
|
|
|
|
|
|
import static org.hamcrest.CoreMatchers.allOf;
|
|
|
import static org.hamcrest.CoreMatchers.anyOf;
|
|
@@ -89,12 +95,6 @@ public class TestDFSAdmin {
|
|
|
namenode = cluster.getNameNode();
|
|
|
}
|
|
|
|
|
|
- private void startReconfiguration(String nodeType, String address,
|
|
|
- final List<String> outs, final List<String> errs) throws IOException {
|
|
|
- reconfigurationOutErrFormatter("startReconfiguration", nodeType,
|
|
|
- address, outs, errs);
|
|
|
- }
|
|
|
-
|
|
|
private void getReconfigurableProperties(String nodeType, String address,
|
|
|
final List<String> outs, final List<String> errs) throws IOException {
|
|
|
reconfigurationOutErrFormatter("getReconfigurableProperties", nodeType,
|
|
@@ -151,9 +151,10 @@ public class TestDFSAdmin {
|
|
|
* @param expectedSuccuss set true if the reconfiguration task should success.
|
|
|
* @throws IOException
|
|
|
* @throws InterruptedException
|
|
|
+ * @throws TimeoutException
|
|
|
*/
|
|
|
private void testDataNodeGetReconfigurationStatus(boolean expectedSuccuss)
|
|
|
- throws IOException, InterruptedException {
|
|
|
+ throws IOException, InterruptedException, TimeoutException {
|
|
|
ReconfigurationUtil ru = mock(ReconfigurationUtil.class);
|
|
|
datanode.setReconfigurationUtil(ru);
|
|
|
|
|
@@ -179,21 +180,10 @@ public class TestDFSAdmin {
|
|
|
|
|
|
assertThat(admin.startReconfiguration("datanode", address), is(0));
|
|
|
|
|
|
- int count = 100;
|
|
|
final List<String> outs = Lists.newArrayList();
|
|
|
final List<String> errs = Lists.newArrayList();
|
|
|
- while (count > 0) {
|
|
|
- outs.clear();
|
|
|
- errs.clear();
|
|
|
- getReconfigurationStatus("datanode", address, outs, errs);
|
|
|
- if (!outs.isEmpty() && outs.get(0).contains("finished")) {
|
|
|
- break;
|
|
|
- }
|
|
|
- count--;
|
|
|
- Thread.sleep(100);
|
|
|
- }
|
|
|
- LOG.info(String.format("count=%d", count));
|
|
|
- assertTrue(count > 0);
|
|
|
+ awaitReconfigurationFinished("datanode", address, outs, errs);
|
|
|
+
|
|
|
if (expectedSuccuss) {
|
|
|
assertThat(outs.size(), is(4));
|
|
|
} else {
|
|
@@ -232,59 +222,89 @@ public class TestDFSAdmin {
|
|
|
|
|
|
@Test(timeout = 30000)
|
|
|
public void testDataNodeGetReconfigurationStatus() throws IOException,
|
|
|
- InterruptedException {
|
|
|
+ InterruptedException, TimeoutException {
|
|
|
testDataNodeGetReconfigurationStatus(true);
|
|
|
restartCluster();
|
|
|
testDataNodeGetReconfigurationStatus(false);
|
|
|
}
|
|
|
|
|
|
- @Test(timeout = 30000)
|
|
|
- public void testNameNodeStartReconfiguration() throws IOException {
|
|
|
- final String address = namenode.getHostAndPort();
|
|
|
- final List<String> outs = Lists.newArrayList();
|
|
|
- final List<String> errs = Lists.newArrayList();
|
|
|
- startReconfiguration("namenode", address, outs, errs);
|
|
|
- assertEquals(0, outs.size());
|
|
|
- assertTrue(errs.size() > 1);
|
|
|
- assertThat(
|
|
|
- errs.get(0),
|
|
|
- is(allOf(containsString("Namenode"), containsString("reconfiguring:"),
|
|
|
- containsString("startReconfiguration"),
|
|
|
- containsString("is not implemented"),
|
|
|
- containsString("UnsupportedOperationException"))));
|
|
|
- }
|
|
|
-
|
|
|
@Test(timeout = 30000)
|
|
|
public void testNameNodeGetReconfigurableProperties() throws IOException {
|
|
|
final String address = namenode.getHostAndPort();
|
|
|
final List<String> outs = Lists.newArrayList();
|
|
|
final List<String> errs = Lists.newArrayList();
|
|
|
getReconfigurableProperties("namenode", address, outs, errs);
|
|
|
- assertEquals(0, outs.size());
|
|
|
- assertTrue(errs.size() > 1);
|
|
|
- assertThat(
|
|
|
- errs.get(0),
|
|
|
- is(allOf(containsString("Namenode"),
|
|
|
- containsString("reconfiguration:"),
|
|
|
- containsString("listReconfigurableProperties"),
|
|
|
- containsString("is not implemented"),
|
|
|
- containsString("UnsupportedOperationException"))));
|
|
|
+ assertEquals(3, outs.size());
|
|
|
+ assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(1));
|
|
|
+ assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(2));
|
|
|
+ assertEquals(errs.size(), 0);
|
|
|
+ }
|
|
|
+
|
|
|
+ void awaitReconfigurationFinished(final String nodeType,
|
|
|
+ final String address, final List<String> outs, final List<String> errs)
|
|
|
+ throws TimeoutException, IOException, InterruptedException {
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
+ @Override
|
|
|
+ public Boolean get() {
|
|
|
+ outs.clear();
|
|
|
+ errs.clear();
|
|
|
+ try {
|
|
|
+ getReconfigurationStatus(nodeType, address, outs, errs);
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.error(String.format(
|
|
|
+ "call getReconfigurationStatus on %s[%s] failed.", nodeType,
|
|
|
+ address), e);
|
|
|
+ }
|
|
|
+ return !outs.isEmpty() && outs.get(0).contains("finished");
|
|
|
+
|
|
|
+ }
|
|
|
+ }, 100, 100 * 100);
|
|
|
}
|
|
|
|
|
|
@Test(timeout = 30000)
|
|
|
- public void testNameNodeGetReconfigurationStatus() throws IOException {
|
|
|
+ public void testNameNodeGetReconfigurationStatus() throws IOException,
|
|
|
+ InterruptedException, TimeoutException {
|
|
|
+ ReconfigurationUtil ru = mock(ReconfigurationUtil.class);
|
|
|
+ namenode.setReconfigurationUtil(ru);
|
|
|
final String address = namenode.getHostAndPort();
|
|
|
+
|
|
|
+ List<ReconfigurationUtil.PropertyChange> changes =
|
|
|
+ new ArrayList<>();
|
|
|
+ changes.add(new ReconfigurationUtil.PropertyChange(
|
|
|
+ DFS_HEARTBEAT_INTERVAL_KEY, String.valueOf(6),
|
|
|
+ namenode.getConf().get(DFS_HEARTBEAT_INTERVAL_KEY)));
|
|
|
+ changes.add(new ReconfigurationUtil.PropertyChange(
|
|
|
+ "randomKey", "new123", "old456"));
|
|
|
+ when(ru.parseChangedProperties(any(Configuration.class),
|
|
|
+ any(Configuration.class))).thenReturn(changes);
|
|
|
+ assertThat(admin.startReconfiguration("namenode", address), is(0));
|
|
|
+
|
|
|
final List<String> outs = Lists.newArrayList();
|
|
|
final List<String> errs = Lists.newArrayList();
|
|
|
- getReconfigurationStatus("namenode", address, outs, errs);
|
|
|
- assertEquals(0, outs.size());
|
|
|
- assertTrue(errs.size() > 1);
|
|
|
- assertThat(
|
|
|
- errs.get(0),
|
|
|
- is(allOf(containsString("Namenode"),
|
|
|
- containsString("reloading configuration:"),
|
|
|
- containsString("getReconfigurationStatus"),
|
|
|
- containsString("is not implemented"),
|
|
|
- containsString("UnsupportedOperationException"))));
|
|
|
+ awaitReconfigurationFinished("namenode", address, outs, errs);
|
|
|
+
|
|
|
+ // verify change
|
|
|
+ assertEquals(
|
|
|
+ DFS_HEARTBEAT_INTERVAL_KEY + " has wrong value",
|
|
|
+ 6,
|
|
|
+ namenode
|
|
|
+ .getConf()
|
|
|
+ .getLong(DFS_HEARTBEAT_INTERVAL_KEY,
|
|
|
+ DFS_HEARTBEAT_INTERVAL_DEFAULT));
|
|
|
+ assertEquals(DFS_HEARTBEAT_INTERVAL_KEY + " has wrong value",
|
|
|
+ 6,
|
|
|
+ namenode
|
|
|
+ .getNamesystem()
|
|
|
+ .getBlockManager()
|
|
|
+ .getDatanodeManager()
|
|
|
+ .getHeartbeatInterval());
|
|
|
+
|
|
|
+ int offset = 1;
|
|
|
+ assertThat(outs.get(offset), containsString("SUCCESS: Changed property "
|
|
|
+ + DFS_HEARTBEAT_INTERVAL_KEY));
|
|
|
+ assertThat(outs.get(offset + 1),
|
|
|
+ is(allOf(containsString("From:"), containsString("3"))));
|
|
|
+ assertThat(outs.get(offset + 2),
|
|
|
+ is(allOf(containsString("To:"), containsString("6"))));
|
|
|
}
|
|
|
}
|