|
@@ -30,6 +30,7 @@ import java.net.InetSocketAddress;
|
|
import java.net.UnknownHostException;
|
|
import java.net.UnknownHostException;
|
|
import java.nio.ByteBuffer;
|
|
import java.nio.ByteBuffer;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
|
|
+import java.util.Arrays;
|
|
import java.util.EnumSet;
|
|
import java.util.EnumSet;
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
import java.util.HashSet;
|
|
import java.util.HashSet;
|
|
@@ -67,13 +68,14 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
|
import org.apache.hadoop.yarn.api.records.Container;
|
|
import org.apache.hadoop.yarn.api.records.Container;
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
|
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
|
|
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
|
|
|
+import org.apache.hadoop.yarn.api.records.Resource;
|
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
|
|
|
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
|
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
|
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
|
@@ -303,13 +305,11 @@ public class TestRMRestart {
|
|
nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
|
|
nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
|
|
nm2 = new MockNM("127.0.0.2:5678", 15120, rm2.getResourceTrackerService());
|
|
nm2 = new MockNM("127.0.0.2:5678", 15120, rm2.getResourceTrackerService());
|
|
|
|
|
|
- List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
|
|
|
|
- ContainerStatus containerStatus =
|
|
|
|
- BuilderUtils.newContainerStatus(BuilderUtils.newContainerId(loadedApp1
|
|
|
|
- .getCurrentAppAttempt().getAppAttemptId(), 1),
|
|
|
|
- ContainerState.COMPLETE, "Killed AM container", 143);
|
|
|
|
- containerStatuses.add(containerStatus);
|
|
|
|
- nm1.registerNode(containerStatuses);
|
|
|
|
|
|
+ NMContainerStatus status =
|
|
|
|
+ TestRMRestart
|
|
|
|
+ .createNMContainerStatus(loadedApp1.getCurrentAppAttempt()
|
|
|
|
+ .getAppAttemptId(), 1, ContainerState.COMPLETE);
|
|
|
|
+ nm1.registerNode(Arrays.asList(status));
|
|
nm2.registerNode();
|
|
nm2.registerNode();
|
|
|
|
|
|
rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED);
|
|
rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED);
|
|
@@ -510,14 +510,11 @@ public class TestRMRestart {
|
|
Assert.assertEquals(RMAppAttemptState.LAUNCHED,
|
|
Assert.assertEquals(RMAppAttemptState.LAUNCHED,
|
|
rmApp.getAppAttempts().get(am2.getApplicationAttemptId())
|
|
rmApp.getAppAttempts().get(am2.getApplicationAttemptId())
|
|
.getAppAttemptState());
|
|
.getAppAttemptState());
|
|
-
|
|
|
|
- List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
|
|
|
|
- ContainerStatus containerStatus =
|
|
|
|
- BuilderUtils.newContainerStatus(
|
|
|
|
- BuilderUtils.newContainerId(am2.getApplicationAttemptId(), 1),
|
|
|
|
- ContainerState.COMPLETE, "Killed AM container", 143);
|
|
|
|
- containerStatuses.add(containerStatus);
|
|
|
|
- nm1.registerNode(containerStatuses);
|
|
|
|
|
|
+
|
|
|
|
+ NMContainerStatus status =
|
|
|
|
+ TestRMRestart.createNMContainerStatus(
|
|
|
|
+ am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
|
|
|
+ nm1.registerNode(Arrays.asList(status));
|
|
rm2.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
|
rm2.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
|
launchAM(rmApp, rm2, nm1);
|
|
launchAM(rmApp, rm2, nm1);
|
|
Assert.assertEquals(3, rmApp.getAppAttempts().size());
|
|
Assert.assertEquals(3, rmApp.getAppAttempts().size());
|
|
@@ -1678,13 +1675,12 @@ public class TestRMRestart {
|
|
am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
|
|
am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
|
|
nm1.nodeHeartbeat(true);
|
|
nm1.nodeHeartbeat(true);
|
|
nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
|
|
nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
|
|
- List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
|
|
|
|
- ContainerStatus containerStatus =
|
|
|
|
- BuilderUtils.newContainerStatus(BuilderUtils.newContainerId(loadedApp1
|
|
|
|
- .getCurrentAppAttempt().getAppAttemptId(), 1),
|
|
|
|
- ContainerState.COMPLETE, "Killed AM container", 143);
|
|
|
|
- containerStatuses.add(containerStatus);
|
|
|
|
- nm1.registerNode(containerStatuses);
|
|
|
|
|
|
+
|
|
|
|
+ NMContainerStatus status =
|
|
|
|
+ TestRMRestart
|
|
|
|
+ .createNMContainerStatus(loadedApp1.getCurrentAppAttempt()
|
|
|
|
+ .getAppAttemptId(), 1, ContainerState.COMPLETE);
|
|
|
|
+ nm1.registerNode(Arrays.asList(status));
|
|
while (loadedApp1.getAppAttempts().size() != 2) {
|
|
while (loadedApp1.getAppAttempts().size() != 2) {
|
|
Thread.sleep(200);
|
|
Thread.sleep(200);
|
|
}
|
|
}
|
|
@@ -1808,12 +1804,10 @@ public class TestRMRestart {
|
|
// ResourceTrackerService is started.
|
|
// ResourceTrackerService is started.
|
|
super.serviceStart();
|
|
super.serviceStart();
|
|
nm1.setResourceTrackerService(getResourceTrackerService());
|
|
nm1.setResourceTrackerService(getResourceTrackerService());
|
|
- List<ContainerStatus> status = new ArrayList<ContainerStatus>();
|
|
|
|
- ContainerId amContainer =
|
|
|
|
- ContainerId.newInstance(am0.getApplicationAttemptId(), 1);
|
|
|
|
- status.add(ContainerStatus.newInstance(amContainer,
|
|
|
|
- ContainerState.COMPLETE, "AM container exit", 143));
|
|
|
|
- nm1.registerNode(status);
|
|
|
|
|
|
+ NMContainerStatus status =
|
|
|
|
+ TestRMRestart.createNMContainerStatus(
|
|
|
|
+ am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
|
|
|
+ nm1.registerNode(Arrays.asList(status));
|
|
}
|
|
}
|
|
};
|
|
};
|
|
}
|
|
}
|
|
@@ -1852,6 +1846,15 @@ public class TestRMRestart {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ public static NMContainerStatus createNMContainerStatus(
|
|
|
|
+ ApplicationAttemptId appAttemptId, int id, ContainerState containerState) {
|
|
|
|
+ ContainerId containerId = ContainerId.newInstance(appAttemptId, id);
|
|
|
|
+ NMContainerStatus containerReport =
|
|
|
|
+ NMContainerStatus.newInstance(containerId, containerState,
|
|
|
|
+ Resource.newInstance(1024, 1), "recover container", 0);
|
|
|
|
+ return containerReport;
|
|
|
|
+ }
|
|
|
|
+
|
|
public class TestMemoryRMStateStore extends MemoryRMStateStore {
|
|
public class TestMemoryRMStateStore extends MemoryRMStateStore {
|
|
int count = 0;
|
|
int count = 0;
|
|
public int updateApp = 0;
|
|
public int updateApp = 0;
|