|
@@ -22,6 +22,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
|
|
import com.google.common.collect.Lists;
|
|
import com.google.common.collect.Lists;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.ReconfigurationUtil;
|
|
import org.apache.hadoop.conf.ReconfigurationUtil;
|
|
|
|
+import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
@@ -42,6 +43,7 @@ import static org.hamcrest.CoreMatchers.allOf;
|
|
import static org.hamcrest.CoreMatchers.anyOf;
|
|
import static org.hamcrest.CoreMatchers.anyOf;
|
|
import static org.hamcrest.CoreMatchers.is;
|
|
import static org.hamcrest.CoreMatchers.is;
|
|
import static org.hamcrest.CoreMatchers.not;
|
|
import static org.hamcrest.CoreMatchers.not;
|
|
|
|
+import static org.junit.Assert.assertEquals;
|
|
import static org.junit.Assert.assertThat;
|
|
import static org.junit.Assert.assertThat;
|
|
import static org.junit.Assert.assertTrue;
|
|
import static org.junit.Assert.assertTrue;
|
|
import static org.hamcrest.CoreMatchers.containsString;
|
|
import static org.hamcrest.CoreMatchers.containsString;
|
|
@@ -50,18 +52,17 @@ import static org.mockito.Mockito.mock;
|
|
import static org.mockito.Mockito.when;
|
|
import static org.mockito.Mockito.when;
|
|
|
|
|
|
public class TestDFSAdmin {
|
|
public class TestDFSAdmin {
|
|
|
|
+ private Configuration conf = null;
|
|
private MiniDFSCluster cluster;
|
|
private MiniDFSCluster cluster;
|
|
private DFSAdmin admin;
|
|
private DFSAdmin admin;
|
|
private DataNode datanode;
|
|
private DataNode datanode;
|
|
|
|
|
|
@Before
|
|
@Before
|
|
public void setUp() throws Exception {
|
|
public void setUp() throws Exception {
|
|
- Configuration conf = new Configuration();
|
|
|
|
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
|
|
|
- cluster.waitActive();
|
|
|
|
|
|
+ conf = new Configuration();
|
|
|
|
+ restartCluster();
|
|
|
|
|
|
admin = new DFSAdmin();
|
|
admin = new DFSAdmin();
|
|
- datanode = cluster.getDataNodes().get(0);
|
|
|
|
}
|
|
}
|
|
|
|
|
|
@After
|
|
@After
|
|
@@ -72,6 +73,15 @@ public class TestDFSAdmin {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private void restartCluster() throws IOException {
|
|
|
|
+ if (cluster != null) {
|
|
|
|
+ cluster.shutdown();
|
|
|
|
+ }
|
|
|
|
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
|
|
|
+ cluster.waitActive();
|
|
|
|
+ datanode = cluster.getDataNodes().get(0);
|
|
|
|
+ }
|
|
|
|
+
|
|
private List<String> getReconfigureStatus(String nodeType, String address)
|
|
private List<String> getReconfigureStatus(String nodeType, String address)
|
|
throws IOException {
|
|
throws IOException {
|
|
ByteArrayOutputStream bufOut = new ByteArrayOutputStream();
|
|
ByteArrayOutputStream bufOut = new ByteArrayOutputStream();
|
|
@@ -87,16 +97,26 @@ public class TestDFSAdmin {
|
|
return outputs;
|
|
return outputs;
|
|
}
|
|
}
|
|
|
|
|
|
- @Test(timeout = 30000)
|
|
|
|
- public void testGetReconfigureStatus()
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Test reconfiguration and check the status outputs.
|
|
|
|
+ * @param expectedSuccuss set true if the reconfiguration task should success.
|
|
|
|
+ * @throws IOException
|
|
|
|
+ * @throws InterruptedException
|
|
|
|
+ */
|
|
|
|
+ private void testGetReconfigurationStatus(boolean expectedSuccuss)
|
|
throws IOException, InterruptedException {
|
|
throws IOException, InterruptedException {
|
|
ReconfigurationUtil ru = mock(ReconfigurationUtil.class);
|
|
ReconfigurationUtil ru = mock(ReconfigurationUtil.class);
|
|
datanode.setReconfigurationUtil(ru);
|
|
datanode.setReconfigurationUtil(ru);
|
|
|
|
|
|
List<ReconfigurationUtil.PropertyChange> changes =
|
|
List<ReconfigurationUtil.PropertyChange> changes =
|
|
- new ArrayList<ReconfigurationUtil.PropertyChange>();
|
|
|
|
|
|
+ new ArrayList<>();
|
|
File newDir = new File(cluster.getDataDirectory(), "data_new");
|
|
File newDir = new File(cluster.getDataDirectory(), "data_new");
|
|
- newDir.mkdirs();
|
|
|
|
|
|
+ if (expectedSuccuss) {
|
|
|
|
+ newDir.mkdirs();
|
|
|
|
+ } else {
|
|
|
|
+ // Inject failure.
|
|
|
|
+ newDir.createNewFile();
|
|
|
|
+ }
|
|
changes.add(new ReconfigurationUtil.PropertyChange(
|
|
changes.add(new ReconfigurationUtil.PropertyChange(
|
|
DFS_DATANODE_DATA_DIR_KEY, newDir.toString(),
|
|
DFS_DATANODE_DATA_DIR_KEY, newDir.toString(),
|
|
datanode.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
|
|
datanode.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
|
|
@@ -121,31 +141,74 @@ public class TestDFSAdmin {
|
|
Thread.sleep(100);
|
|
Thread.sleep(100);
|
|
}
|
|
}
|
|
assertTrue(count > 0);
|
|
assertTrue(count > 0);
|
|
- assertThat(outputs.size(), is(8)); // 3 (SUCCESS) + 4 (FAILED)
|
|
|
|
|
|
+ if (expectedSuccuss) {
|
|
|
|
+ assertThat(outputs.size(), is(4));
|
|
|
|
+ } else {
|
|
|
|
+ assertThat(outputs.size(), is(6));
|
|
|
|
+ }
|
|
|
|
|
|
List<StorageLocation> locations = DataNode.getStorageLocations(
|
|
List<StorageLocation> locations = DataNode.getStorageLocations(
|
|
datanode.getConf());
|
|
datanode.getConf());
|
|
- assertThat(locations.size(), is(1));
|
|
|
|
- assertThat(locations.get(0).getFile(), is(newDir));
|
|
|
|
- // Verify the directory is appropriately formatted.
|
|
|
|
- assertTrue(new File(newDir, Storage.STORAGE_DIR_CURRENT).isDirectory());
|
|
|
|
-
|
|
|
|
- int successOffset = outputs.get(1).startsWith("SUCCESS:") ? 1 : 5;
|
|
|
|
- int failedOffset = outputs.get(1).startsWith("FAILED:") ? 1: 4;
|
|
|
|
- assertThat(outputs.get(successOffset),
|
|
|
|
- containsString("Change property " + DFS_DATANODE_DATA_DIR_KEY));
|
|
|
|
- assertThat(outputs.get(successOffset + 1),
|
|
|
|
|
|
+ if (expectedSuccuss) {
|
|
|
|
+ assertThat(locations.size(), is(1));
|
|
|
|
+ assertThat(locations.get(0).getFile(), is(newDir));
|
|
|
|
+ // Verify the directory is appropriately formatted.
|
|
|
|
+ assertTrue(new File(newDir, Storage.STORAGE_DIR_CURRENT).isDirectory());
|
|
|
|
+ } else {
|
|
|
|
+ assertTrue(locations.isEmpty());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ int offset = 1;
|
|
|
|
+ if (expectedSuccuss) {
|
|
|
|
+ assertThat(outputs.get(offset),
|
|
|
|
+ containsString("SUCCESS: Changed property " +
|
|
|
|
+ DFS_DATANODE_DATA_DIR_KEY));
|
|
|
|
+ } else {
|
|
|
|
+ assertThat(outputs.get(offset),
|
|
|
|
+ containsString("FAILED: Change property " +
|
|
|
|
+ DFS_DATANODE_DATA_DIR_KEY));
|
|
|
|
+ }
|
|
|
|
+ assertThat(outputs.get(offset + 1),
|
|
is(allOf(containsString("From:"), containsString("data1"),
|
|
is(allOf(containsString("From:"), containsString("data1"),
|
|
containsString("data2"))));
|
|
containsString("data2"))));
|
|
- assertThat(outputs.get(successOffset + 2),
|
|
|
|
|
|
+ assertThat(outputs.get(offset + 2),
|
|
is(not(anyOf(containsString("data1"), containsString("data2")))));
|
|
is(not(anyOf(containsString("data1"), containsString("data2")))));
|
|
- assertThat(outputs.get(successOffset + 2),
|
|
|
|
|
|
+ assertThat(outputs.get(offset + 2),
|
|
is(allOf(containsString("To"), containsString("data_new"))));
|
|
is(allOf(containsString("To"), containsString("data_new"))));
|
|
- assertThat(outputs.get(failedOffset),
|
|
|
|
- containsString("Change property randomKey"));
|
|
|
|
- assertThat(outputs.get(failedOffset + 1),
|
|
|
|
- containsString("From: \"old456\""));
|
|
|
|
- assertThat(outputs.get(failedOffset + 2),
|
|
|
|
- containsString("To: \"new123\""));
|
|
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test(timeout = 30000)
|
|
|
|
+ public void testGetReconfigurationStatus()
|
|
|
|
+ throws IOException, InterruptedException {
|
|
|
|
+ testGetReconfigurationStatus(true);
|
|
|
|
+ restartCluster();
|
|
|
|
+ testGetReconfigurationStatus(false);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private List<String> getReconfigurationAllowedProperties(
|
|
|
|
+ String nodeType, String address)
|
|
|
|
+ throws IOException {
|
|
|
|
+ ByteArrayOutputStream bufOut = new ByteArrayOutputStream();
|
|
|
|
+ PrintStream out = new PrintStream(bufOut);
|
|
|
|
+ ByteArrayOutputStream bufErr = new ByteArrayOutputStream();
|
|
|
|
+ PrintStream err = new PrintStream(bufErr);
|
|
|
|
+ admin.getReconfigurableProperties(nodeType, address, out, err);
|
|
|
|
+ Scanner scanner = new Scanner(bufOut.toString());
|
|
|
|
+ List<String> outputs = Lists.newArrayList();
|
|
|
|
+ while (scanner.hasNextLine()) {
|
|
|
|
+ outputs.add(scanner.nextLine());
|
|
|
|
+ }
|
|
|
|
+ return outputs;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test(timeout = 30000)
|
|
|
|
+ public void testGetReconfigAllowedProperties() throws IOException {
|
|
|
|
+ final int port = datanode.getIpcPort();
|
|
|
|
+ final String address = "localhost:" + port;
|
|
|
|
+ List<String> outputs =
|
|
|
|
+ getReconfigurationAllowedProperties("datanode", address);
|
|
|
|
+ assertEquals(2, outputs.size());
|
|
|
|
+ assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
|
|
|
|
+ outputs.get(1));
|
|
}
|
|
}
|
|
}
|
|
}
|