|
@@ -22,11 +22,13 @@ import static org.junit.Assert.assertTrue;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Arrays;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Random;
|
|
|
|
|
|
+import com.google.common.collect.Lists;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
@@ -149,10 +151,18 @@ public class AdminStatesBaseTest {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /*
|
|
|
- * decommission the DN or put the DN into maintenance for datanodeUuid or one
|
|
|
- * random node if datanodeUuid is null.
|
|
|
- * And wait for the node to reach the given {@code waitForState}.
|
|
|
+ /**
|
|
|
+ * Decommission or perform Maintenance for DataNodes and wait for them to
|
|
|
+ * reach the expected state.
|
|
|
+ *
|
|
|
+ * @param nnIndex NameNode index
|
|
|
+ * @param datanodeUuid DataNode to decommission/maintenance, or a random
|
|
|
+ * DataNode if null
|
|
|
+ * @param maintenanceExpirationInMS Maintenance expiration time
|
|
|
+ * @param decommissionedNodes List of DataNodes already decommissioned
|
|
|
+ * @param waitForState Await for this state for datanodeUuid DataNode
|
|
|
+ * @return DatanodeInfo DataNode taken out of service
|
|
|
+ * @throws IOException
|
|
|
*/
|
|
|
protected DatanodeInfo takeNodeOutofService(int nnIndex,
|
|
|
String datanodeUuid, long maintenanceExpirationInMS,
|
|
@@ -162,48 +172,91 @@ public class AdminStatesBaseTest {
|
|
|
maintenanceExpirationInMS, decommissionedNodes, null, waitForState);
|
|
|
}
|
|
|
|
|
|
- /*
|
|
|
- * decommission the DN or put the DN to maintenance set by datanodeUuid
|
|
|
- * Pick randome node if datanodeUuid == null
|
|
|
- * wait for the node to reach the given {@code waitForState}.
|
|
|
+ /**
|
|
|
+ * Decommission or perform Maintenance for DataNodes and wait for them to
|
|
|
+ * reach the expected state.
|
|
|
+ *
|
|
|
+ * @param nnIndex NameNode index
|
|
|
+ * @param datanodeUuid DataNode to decommission/maintenance, or a random
|
|
|
+ * DataNode if null
|
|
|
+ * @param maintenanceExpirationInMS Maintenance expiration time
|
|
|
+ * @param decommissionedNodes List of DataNodes already decommissioned
|
|
|
+ * @param inMaintenanceNodes Map of DataNodes already entering/in maintenance
|
|
|
+ * @param waitForState Await for this state for datanodeUuid DataNode
|
|
|
+ * @return DatanodeInfo DataNode taken out of service
|
|
|
+ * @throws IOException
|
|
|
*/
|
|
|
protected DatanodeInfo takeNodeOutofService(int nnIndex,
|
|
|
String datanodeUuid, long maintenanceExpirationInMS,
|
|
|
List<DatanodeInfo> decommissionedNodes,
|
|
|
Map<DatanodeInfo, Long> inMaintenanceNodes, AdminStates waitForState)
|
|
|
throws IOException {
|
|
|
+ return takeNodeOutofService(nnIndex, (datanodeUuid != null ?
|
|
|
+ Lists.newArrayList(datanodeUuid) : null),
|
|
|
+ maintenanceExpirationInMS, decommissionedNodes, inMaintenanceNodes,
|
|
|
+ waitForState).get(0);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Decommission or perform Maintenance for DataNodes and wait for them to
|
|
|
+ * reach the expected state.
|
|
|
+ *
|
|
|
+ * @param nnIndex NameNode index
|
|
|
+ * @param dataNodeUuids DataNodes to decommission/maintenance, or a random
|
|
|
+ * DataNode if null
|
|
|
+ * @param maintenanceExpirationInMS Maintenance expiration time
|
|
|
+ * @param decommissionedNodes List of DataNodes already decommissioned
|
|
|
+ * @param inMaintenanceNodes Map of DataNodes already entering/in maintenance
|
|
|
+ * @param waitForState Await for this state for datanodeUuid DataNode
|
|
|
+ * @return DatanodeInfo DataNode taken out of service
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ protected List<DatanodeInfo> takeNodeOutofService(int nnIndex,
|
|
|
+ List<String> dataNodeUuids, long maintenanceExpirationInMS,
|
|
|
+ List<DatanodeInfo> decommissionedNodes,
|
|
|
+ Map<DatanodeInfo, Long> inMaintenanceNodes, AdminStates waitForState)
|
|
|
+ throws IOException {
|
|
|
DFSClient client = getDfsClient(nnIndex);
|
|
|
DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.ALL);
|
|
|
boolean isDecommissionRequest =
|
|
|
waitForState == AdminStates.DECOMMISSION_INPROGRESS ||
|
|
|
- waitForState == AdminStates.DECOMMISSIONED;
|
|
|
+ waitForState == AdminStates.DECOMMISSIONED;
|
|
|
|
|
|
- //
|
|
|
- // pick one datanode randomly unless the caller specifies one.
|
|
|
- //
|
|
|
- int index = 0;
|
|
|
- if (datanodeUuid == null) {
|
|
|
+ List<String> dataNodeNames = new ArrayList<>();
|
|
|
+ List<DatanodeInfo> datanodeInfos = new ArrayList<>();
|
|
|
+ // pick one DataNode randomly unless the caller specifies one.
|
|
|
+ if (dataNodeUuids == null) {
|
|
|
boolean found = false;
|
|
|
while (!found) {
|
|
|
- index = myrand.nextInt(info.length);
|
|
|
+ int index = myrand.nextInt(info.length);
|
|
|
if ((isDecommissionRequest && !info[index].isDecommissioned()) ||
|
|
|
(!isDecommissionRequest && !info[index].isInMaintenance())) {
|
|
|
+ dataNodeNames.add(info[index].getXferAddr());
|
|
|
+ datanodeInfos.add(NameNodeAdapter.getDatanode(
|
|
|
+ cluster.getNamesystem(nnIndex), info[index]));
|
|
|
found = true;
|
|
|
}
|
|
|
}
|
|
|
} else {
|
|
|
- // The caller specifies a DN
|
|
|
- for (; index < info.length; index++) {
|
|
|
- if (info[index].getDatanodeUuid().equals(datanodeUuid)) {
|
|
|
- break;
|
|
|
+ // The caller specified a DataNode
|
|
|
+ for (String datanodeUuid : dataNodeUuids) {
|
|
|
+ boolean found = false;
|
|
|
+ for (int index = 0; index < info.length; index++) {
|
|
|
+ if (info[index].getDatanodeUuid().equals(datanodeUuid)) {
|
|
|
+ dataNodeNames.add(info[index].getXferAddr());
|
|
|
+ datanodeInfos.add(NameNodeAdapter.getDatanode(
|
|
|
+ cluster.getNamesystem(nnIndex), info[index]));
|
|
|
+ found = true;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (!found) {
|
|
|
+ throw new IOException("invalid datanodeUuid " + datanodeUuid);
|
|
|
}
|
|
|
- }
|
|
|
- if (index == info.length) {
|
|
|
- throw new IOException("invalid datanodeUuid " + datanodeUuid);
|
|
|
}
|
|
|
}
|
|
|
- String nodename = info[index].getXferAddr();
|
|
|
- LOG.info("Taking node: " + nodename + " out of service");
|
|
|
+ LOG.info("Taking node: " + Arrays.toString(dataNodeNames.toArray())
|
|
|
+ + " out of service");
|
|
|
|
|
|
ArrayList<String> decommissionNodes = new ArrayList<String>();
|
|
|
if (decommissionedNodes != null) {
|
|
@@ -220,18 +273,20 @@ public class AdminStatesBaseTest {
|
|
|
}
|
|
|
|
|
|
if (isDecommissionRequest) {
|
|
|
- decommissionNodes.add(nodename);
|
|
|
+ for (String dataNodeName : dataNodeNames) {
|
|
|
+ decommissionNodes.add(dataNodeName);
|
|
|
+ }
|
|
|
} else {
|
|
|
- maintenanceNodes.put(nodename, maintenanceExpirationInMS);
|
|
|
+ for (String dataNodeName : dataNodeNames) {
|
|
|
+ maintenanceNodes.put(dataNodeName, maintenanceExpirationInMS);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// write node names into the json host file.
|
|
|
hostsFileWriter.initOutOfServiceHosts(decommissionNodes, maintenanceNodes);
|
|
|
refreshNodes(nnIndex);
|
|
|
- DatanodeInfo ret = NameNodeAdapter.getDatanode(
|
|
|
- cluster.getNamesystem(nnIndex), info[index]);
|
|
|
- waitNodeState(ret, waitForState);
|
|
|
- return ret;
|
|
|
+ waitNodeState(datanodeInfos, waitForState);
|
|
|
+ return datanodeInfos;
|
|
|
}
|
|
|
|
|
|
/* Ask a specific NN to put the datanode in service and wait for it
|
|
@@ -270,23 +325,31 @@ public class AdminStatesBaseTest {
|
|
|
putNodeInService(nnIndex, datanodeInfo);
|
|
|
}
|
|
|
|
|
|
- /*
|
|
|
- * Wait till node is transitioned to the expected state.
|
|
|
+ /**
|
|
|
+ * Wait till DataNode is transitioned to the expected state.
|
|
|
*/
|
|
|
- protected void waitNodeState(DatanodeInfo node,
|
|
|
- AdminStates state) {
|
|
|
- boolean done = state == node.getAdminState();
|
|
|
- while (!done) {
|
|
|
- LOG.info("Waiting for node " + node + " to change state to "
|
|
|
- + state + " current state: " + node.getAdminState());
|
|
|
- try {
|
|
|
- Thread.sleep(HEARTBEAT_INTERVAL * 500);
|
|
|
- } catch (InterruptedException e) {
|
|
|
- // nothing
|
|
|
+ protected void waitNodeState(DatanodeInfo node, AdminStates state) {
|
|
|
+ waitNodeState(Lists.newArrayList(node), state);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Wait till all DataNodes are transitioned to the expected state.
|
|
|
+ */
|
|
|
+ protected void waitNodeState(List<DatanodeInfo> nodes, AdminStates state) {
|
|
|
+ for (DatanodeInfo node : nodes) {
|
|
|
+ boolean done = (state == node.getAdminState());
|
|
|
+ while (!done) {
|
|
|
+ LOG.info("Waiting for node " + node + " to change state to "
|
|
|
+ + state + " current state: " + node.getAdminState());
|
|
|
+ try {
|
|
|
+ Thread.sleep(HEARTBEAT_INTERVAL * 500);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ // nothing
|
|
|
+ }
|
|
|
+ done = (state == node.getAdminState());
|
|
|
}
|
|
|
- done = state == node.getAdminState();
|
|
|
+ LOG.info("node " + node + " reached the state " + state);
|
|
|
}
|
|
|
- LOG.info("node " + node + " reached the state " + state);
|
|
|
}
|
|
|
|
|
|
protected void initIncludeHost(String hostNameAndPort) throws IOException {
|