|
@@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.api.records.Container;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
|
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
|
|
+import org.apache.hadoop.yarn.api.records.NodeId;
|
|
|
import org.apache.hadoop.yarn.api.records.Priority;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
@@ -82,6 +83,7 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
|
|
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
|
@@ -105,6 +107,9 @@ import org.junit.Assert;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
+import com.google.common.collect.ImmutableMap;
|
|
|
+import com.google.common.collect.Sets;
|
|
|
+
|
|
|
public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
private final static File TEMP_DIR = new File(System.getProperty(
|
|
|
"test.build.data", "/tmp"), "decommision");
|
|
@@ -2036,4 +2041,90 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ // Test does following verification
|
|
|
+ // 1. Start RM1 with store patch /tmp
|
|
|
+ // 2. Add/remove/replace labels to cluster and node lable and verify
|
|
|
+ // 3. Start RM2 with store patch /tmp only
|
|
|
+ // 4. Get cluster and node lobel, it should be present by recovering it
|
|
|
+ @Test(timeout = 20000)
|
|
|
+ public void testRMRestartRecoveringNodeLabelManager() throws Exception {
|
|
|
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
|
|
|
+ memStore.init(conf);
|
|
|
+ MockRM rm1 = new MockRM(conf, memStore) {
|
|
|
+ @Override
|
|
|
+ protected RMNodeLabelsManager createNodeLabelManager() {
|
|
|
+ RMNodeLabelsManager mgr = new RMNodeLabelsManager();
|
|
|
+ mgr.init(getConfig());
|
|
|
+ return mgr;
|
|
|
+ }
|
|
|
+ };
|
|
|
+ rm1.init(conf);
|
|
|
+ rm1.start();
|
|
|
+
|
|
|
+ RMNodeLabelsManager nodeLabelManager =
|
|
|
+ rm1.getRMContext().getNodeLabelManager();
|
|
|
+
|
|
|
+ Set<String> clusterNodeLabels = new HashSet<String>();
|
|
|
+ clusterNodeLabels.add("x");
|
|
|
+ clusterNodeLabels.add("y");
|
|
|
+ clusterNodeLabels.add("z");
|
|
|
+ // Add node label x,y,z
|
|
|
+ nodeLabelManager.addToCluserNodeLabels(clusterNodeLabels);
|
|
|
+
|
|
|
+ // Add node Label to Node h1->x
|
|
|
+ NodeId n1 = NodeId.newInstance("h1", 0);
|
|
|
+ nodeLabelManager.addLabelsToNode(ImmutableMap.of(n1, toSet("x")));
|
|
|
+
|
|
|
+ clusterNodeLabels.remove("z");
|
|
|
+ // Remove cluster label z
|
|
|
+ nodeLabelManager.removeFromClusterNodeLabels(toSet("z"));
|
|
|
+
|
|
|
+ // Replace nodelabel h1->x,y
|
|
|
+ nodeLabelManager.replaceLabelsOnNode(ImmutableMap.of(n1, toSet("x", "y")));
|
|
|
+
|
|
|
+ // Wait for updating store.It is expected NodeStore update should happen
|
|
|
+ // very fast since it has separate dispatcher. So waiting for max 5 seconds,
|
|
|
+ // which is sufficient time to update NodeStore.
|
|
|
+ int count = 10;
|
|
|
+ while (count-- > 0) {
|
|
|
+ if (nodeLabelManager.getNodeLabels().size() > 0) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ Thread.sleep(500);
|
|
|
+ }
|
|
|
+
|
|
|
+ Assert.assertEquals(clusterNodeLabels.size(), nodeLabelManager
|
|
|
+ .getClusterNodeLabels().size());
|
|
|
+
|
|
|
+ Map<NodeId, Set<String>> nodeLabels = nodeLabelManager.getNodeLabels();
|
|
|
+ Assert.assertEquals(1, nodeLabelManager.getNodeLabels().size());
|
|
|
+ Assert.assertTrue(nodeLabels.get(n1).equals(toSet("x", "y")));
|
|
|
+
|
|
|
+ MockRM rm2 = new MockRM(conf, memStore) {
|
|
|
+ @Override
|
|
|
+ protected RMNodeLabelsManager createNodeLabelManager() {
|
|
|
+ RMNodeLabelsManager mgr = new RMNodeLabelsManager();
|
|
|
+ mgr.init(getConfig());
|
|
|
+ return mgr;
|
|
|
+ }
|
|
|
+ };
|
|
|
+ rm2.init(conf);
|
|
|
+ rm2.start();
|
|
|
+
|
|
|
+ nodeLabelManager = rm2.getRMContext().getNodeLabelManager();
|
|
|
+ Assert.assertEquals(clusterNodeLabels.size(), nodeLabelManager
|
|
|
+ .getClusterNodeLabels().size());
|
|
|
+
|
|
|
+ nodeLabels = nodeLabelManager.getNodeLabels();
|
|
|
+ Assert.assertEquals(1, nodeLabelManager.getNodeLabels().size());
|
|
|
+ Assert.assertTrue(nodeLabels.get(n1).equals(toSet("x", "y")));
|
|
|
+ rm1.stop();
|
|
|
+ rm2.stop();
|
|
|
+ }
|
|
|
+
|
|
|
+ private <E> Set<E> toSet(E... elements) {
|
|
|
+ Set<E> set = Sets.newHashSet(elements);
|
|
|
+ return set;
|
|
|
+ }
|
|
|
+
|
|
|
}
|