|
@@ -23,17 +23,12 @@ import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
|
|
|
-import org.apache.commons.lang3.time.DateUtils;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
import org.apache.hadoop.yarn.api.records.Container;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
|
|
-import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
-import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
|
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
|
@@ -43,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerResourceUsageReport;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
|
|
import org.apache.log4j.Level;
|
|
@@ -84,13 +80,13 @@ public class TestContainerResourceUsage {
|
|
|
|
|
|
RMAppMetrics rmAppMetrics = app0.getRMAppMetrics();
|
|
|
Assert.assertTrue(
|
|
|
- "Before app submittion, memory seconds should have been 0 but was "
|
|
|
- + rmAppMetrics.getMemorySeconds(),
|
|
|
- rmAppMetrics.getMemorySeconds() == 0);
|
|
|
+ "Before app submission, memory seconds should have been 0 but was "
|
|
|
+ + rmAppMetrics.getGuaranteedMemorySeconds(),
|
|
|
+ rmAppMetrics.getGuaranteedMemorySeconds() == 0);
|
|
|
Assert.assertTrue(
|
|
|
"Before app submission, vcore seconds should have been 0 but was "
|
|
|
- + rmAppMetrics.getVcoreSeconds(),
|
|
|
- rmAppMetrics.getVcoreSeconds() == 0);
|
|
|
+ + rmAppMetrics.getGuaranteedVcoreSeconds(),
|
|
|
+ rmAppMetrics.getGuaranteedVcoreSeconds() == 0);
|
|
|
|
|
|
RMAppAttempt attempt0 = app0.getCurrentAppAttempt();
|
|
|
|
|
@@ -105,7 +101,8 @@ public class TestContainerResourceUsage {
|
|
|
// Allow metrics to accumulate.
|
|
|
int sleepInterval = 1000;
|
|
|
int cumulativeSleepTime = 0;
|
|
|
- while (rmAppMetrics.getMemorySeconds() <= 0 && cumulativeSleepTime < 5000) {
|
|
|
+ while (rmAppMetrics.getGuaranteedMemorySeconds() <= 0
|
|
|
+ && cumulativeSleepTime < 5000) {
|
|
|
Thread.sleep(sleepInterval);
|
|
|
cumulativeSleepTime += sleepInterval;
|
|
|
}
|
|
@@ -113,27 +110,35 @@ public class TestContainerResourceUsage {
|
|
|
rmAppMetrics = app0.getRMAppMetrics();
|
|
|
Assert.assertTrue(
|
|
|
"While app is running, memory seconds should be >0 but is "
|
|
|
- + rmAppMetrics.getMemorySeconds(),
|
|
|
- rmAppMetrics.getMemorySeconds() > 0);
|
|
|
+ + rmAppMetrics.getGuaranteedMemorySeconds(),
|
|
|
+ rmAppMetrics.getGuaranteedMemorySeconds() > 0);
|
|
|
Assert.assertTrue(
|
|
|
"While app is running, vcore seconds should be >0 but is "
|
|
|
- + rmAppMetrics.getVcoreSeconds(),
|
|
|
- rmAppMetrics.getVcoreSeconds() > 0);
|
|
|
+ + rmAppMetrics.getGuaranteedVcoreSeconds(),
|
|
|
+ rmAppMetrics.getGuaranteedVcoreSeconds() > 0);
|
|
|
|
|
|
MockRM.finishAMAndVerifyAppState(app0, rm, nm, am0);
|
|
|
|
|
|
AggregateAppResourceUsage ru = calculateContainerResourceMetrics(rmContainer);
|
|
|
rmAppMetrics = app0.getRMAppMetrics();
|
|
|
|
|
|
- Assert.assertEquals("Unexpected MemorySeconds value",
|
|
|
- ru.getMemorySeconds(), rmAppMetrics.getMemorySeconds());
|
|
|
- Assert.assertEquals("Unexpected VcoreSeconds value",
|
|
|
- ru.getVcoreSeconds(), rmAppMetrics.getVcoreSeconds());
|
|
|
+ Assert.assertEquals("Unexpected GuaranteedMemorySeconds value",
|
|
|
+ ru.getGuaranteedMemorySeconds(),
|
|
|
+ rmAppMetrics.getGuaranteedMemorySeconds());
|
|
|
+ Assert.assertEquals("Unexpected GuaranteedVcoreSeconds value",
|
|
|
+ ru.getGuaranteedVcoreSeconds(),
|
|
|
+ rmAppMetrics.getGuaranteedVcoreSeconds());
|
|
|
+ Assert.assertEquals("Unexpected OpportunisticMemorySeconds value",
|
|
|
+ ru.getOpportunisticMemorySeconds(),
|
|
|
+ rmAppMetrics.getOpportunisticMemorySeconds());
|
|
|
+ Assert.assertEquals("Unexpected OpportunisticVcoreSeconds value",
|
|
|
+ ru.getOpportunisticVcoreSeconds(),
|
|
|
+ rmAppMetrics.getOpportunisticVcoreSeconds());
|
|
|
|
|
|
rm.stop();
|
|
|
}
|
|
|
|
|
|
- @Test (timeout = 120000)
|
|
|
+ @Test ()
|
|
|
public void testUsageWithMultipleContainersAndRMRestart() throws Exception {
|
|
|
// Set max attempts to 1 so that when the first attempt fails, the app
|
|
|
// won't try to start a new one.
|
|
@@ -191,7 +196,7 @@ public class TestContainerResourceUsage {
|
|
|
// Allow metrics to accumulate.
|
|
|
int sleepInterval = 1000;
|
|
|
int cumulativeSleepTime = 0;
|
|
|
- while (app0.getRMAppMetrics().getMemorySeconds() <= 0
|
|
|
+ while (app0.getRMAppMetrics().getGuaranteedMemorySeconds() <= 0
|
|
|
&& cumulativeSleepTime < 5000) {
|
|
|
Thread.sleep(sleepInterval);
|
|
|
cumulativeSleepTime += sleepInterval;
|
|
@@ -216,19 +221,29 @@ public class TestContainerResourceUsage {
|
|
|
rm0.waitForState(nm, cId, RMContainerState.COMPLETED);
|
|
|
|
|
|
// Check that the container metrics match those from the app usage report.
|
|
|
- long memorySeconds = 0;
|
|
|
- long vcoreSeconds = 0;
|
|
|
+ long guaranteedMemorySeconds = 0;
|
|
|
+ long guaranteedVcoreSeconds = 0;
|
|
|
+ long opportunisticMemorySeconds = 0;
|
|
|
+ long opportunisticVcoreSeconds = 0;
|
|
|
for (RMContainer c : rmContainers) {
|
|
|
AggregateAppResourceUsage ru = calculateContainerResourceMetrics(c);
|
|
|
- memorySeconds += ru.getMemorySeconds();
|
|
|
- vcoreSeconds += ru.getVcoreSeconds();
|
|
|
+ guaranteedMemorySeconds += ru.getGuaranteedMemorySeconds();
|
|
|
+ guaranteedVcoreSeconds += ru.getGuaranteedVcoreSeconds();
|
|
|
+ opportunisticMemorySeconds += ru.getOpportunisticMemorySeconds();
|
|
|
+ opportunisticVcoreSeconds += ru.getOpportunisticVcoreSeconds();
|
|
|
}
|
|
|
|
|
|
RMAppMetrics metricsBefore = app0.getRMAppMetrics();
|
|
|
- Assert.assertEquals("Unexpected MemorySeconds value",
|
|
|
- memorySeconds, metricsBefore.getMemorySeconds());
|
|
|
- Assert.assertEquals("Unexpected VcoreSeconds value",
|
|
|
- vcoreSeconds, metricsBefore.getVcoreSeconds());
|
|
|
+ Assert.assertEquals("Unexpected GuaranteedMemorySeconds value",
|
|
|
+ guaranteedMemorySeconds, metricsBefore.getGuaranteedMemorySeconds());
|
|
|
+ Assert.assertEquals("Unexpected GuaranteedVcoreSeconds value",
|
|
|
+ guaranteedVcoreSeconds, metricsBefore.getGuaranteedVcoreSeconds());
|
|
|
+ Assert.assertEquals("Unexpected OpportunisticMemorySeconds value",
|
|
|
+ opportunisticMemorySeconds,
|
|
|
+ metricsBefore.getOpportunisticMemorySeconds());
|
|
|
+ Assert.assertEquals("Unexpected OpportunisticVcoreSeconds value",
|
|
|
+ opportunisticVcoreSeconds,
|
|
|
+ metricsBefore.getOpportunisticVcoreSeconds());
|
|
|
|
|
|
// create new RM to represent RM restart. Load up the state store.
|
|
|
MockRM rm1 = new MockRM(conf, memStore);
|
|
@@ -238,10 +253,22 @@ public class TestContainerResourceUsage {
|
|
|
|
|
|
// Compare container resource usage metrics from before and after restart.
|
|
|
RMAppMetrics metricsAfter = app0After.getRMAppMetrics();
|
|
|
- Assert.assertEquals("Vcore seconds were not the same after RM Restart",
|
|
|
- metricsBefore.getVcoreSeconds(), metricsAfter.getVcoreSeconds());
|
|
|
- Assert.assertEquals("Memory seconds were not the same after RM Restart",
|
|
|
- metricsBefore.getMemorySeconds(), metricsAfter.getMemorySeconds());
|
|
|
+ Assert.assertEquals(
|
|
|
+ "Guaranteed vcore seconds were not the same after RM Restart",
|
|
|
+ metricsBefore.getGuaranteedVcoreSeconds(),
|
|
|
+ metricsAfter.getGuaranteedVcoreSeconds());
|
|
|
+ Assert.assertEquals(
|
|
|
+ "Guaranteed memory seconds were not the same after RM Restart",
|
|
|
+ metricsBefore.getGuaranteedMemorySeconds(),
|
|
|
+ metricsAfter.getGuaranteedMemorySeconds());
|
|
|
+ Assert.assertEquals(
|
|
|
+ "Opportunistic vcore seconds were not the same after RM Restart",
|
|
|
+ metricsBefore.getOpportunisticVcoreSeconds(),
|
|
|
+ metricsAfter.getOpportunisticVcoreSeconds());
|
|
|
+ Assert.assertEquals(
|
|
|
+ "Opportunistic memory seconds were not the same after RM Restart",
|
|
|
+ metricsBefore.getOpportunisticMemorySeconds(),
|
|
|
+ metricsAfter.getOpportunisticMemorySeconds());
|
|
|
|
|
|
rm0.stop();
|
|
|
rm0.close();
|
|
@@ -264,10 +291,8 @@ public class TestContainerResourceUsage {
|
|
|
MockRM rm = new MockRM(conf);
|
|
|
rm.start();
|
|
|
|
|
|
- RMApp app =
|
|
|
- rm.submitApp(200, "name", "user",
|
|
|
- new HashMap<ApplicationAccessType, String>(), false, "default", -1,
|
|
|
- null, "MAPREDUCE", false, keepRunningContainers);
|
|
|
+ RMApp app = rm.submitApp(200, "name", "user", new HashMap<>(), false,
|
|
|
+ "default", -1, null, "MAPREDUCE", false, keepRunningContainers);
|
|
|
MockNM nm =
|
|
|
new MockNM("127.0.0.1:1234", 10240, rm.getResourceTrackerService());
|
|
|
nm.registerNode();
|
|
@@ -275,18 +300,17 @@ public class TestContainerResourceUsage {
|
|
|
MockAM am0 = MockRM.launchAndRegisterAM(app, rm, nm);
|
|
|
int NUM_CONTAINERS = 1;
|
|
|
// allocate NUM_CONTAINERS containers
|
|
|
- am0.allocate("127.0.0.1", 1024, NUM_CONTAINERS,
|
|
|
- new ArrayList<ContainerId>());
|
|
|
+ am0.allocate("127.0.0.1", 1024, NUM_CONTAINERS, new ArrayList<>());
|
|
|
nm.nodeHeartbeat(true);
|
|
|
|
|
|
// wait for containers to be allocated.
|
|
|
List<Container> containers =
|
|
|
- am0.allocate(new ArrayList<ResourceRequest>(),
|
|
|
- new ArrayList<ContainerId>()).getAllocatedContainers();
|
|
|
+ am0.allocate(new ArrayList<>(), new ArrayList<>())
|
|
|
+ .getAllocatedContainers();
|
|
|
while (containers.size() != NUM_CONTAINERS) {
|
|
|
nm.nodeHeartbeat(true);
|
|
|
- containers.addAll(am0.allocate(new ArrayList<ResourceRequest>(),
|
|
|
- new ArrayList<ContainerId>()).getAllocatedContainers());
|
|
|
+ containers.addAll(am0.allocate(new ArrayList<>(),
|
|
|
+ new ArrayList<>()).getAllocatedContainers());
|
|
|
Thread.sleep(200);
|
|
|
}
|
|
|
|
|
@@ -294,26 +318,27 @@ public class TestContainerResourceUsage {
|
|
|
ContainerId containerId2 =
|
|
|
ContainerId.newContainerId(am0.getApplicationAttemptId(), 2);
|
|
|
nm.nodeHeartbeat(am0.getApplicationAttemptId(),
|
|
|
- containerId2.getContainerId(), ContainerState.RUNNING);
|
|
|
+ containerId2.getContainerId(), ContainerState.RUNNING);
|
|
|
rm.waitForState(nm, containerId2, RMContainerState.RUNNING);
|
|
|
|
|
|
// Capture the containers here so the metrics can be calculated after the
|
|
|
// app has completed.
|
|
|
- Collection<RMContainer> rmContainers =
|
|
|
- rm.scheduler
|
|
|
- .getSchedulerAppInfo(am0.getApplicationAttemptId())
|
|
|
- .getLiveContainers();
|
|
|
+ Collection<RMContainer> rmContainers = rm.scheduler
|
|
|
+ .getSchedulerAppInfo(am0.getApplicationAttemptId())
|
|
|
+ .getLiveContainers();
|
|
|
|
|
|
// fail the first app attempt by sending CONTAINER_FINISHED event without
|
|
|
// registering.
|
|
|
ContainerId amContainerId =
|
|
|
app.getCurrentAppAttempt().getMasterContainer().getId();
|
|
|
nm.nodeHeartbeat(am0.getApplicationAttemptId(),
|
|
|
- amContainerId.getContainerId(), ContainerState.COMPLETE);
|
|
|
+ amContainerId.getContainerId(), ContainerState.COMPLETE);
|
|
|
rm.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
|
|
rm.drainEvents();
|
|
|
- long memorySeconds = 0;
|
|
|
- long vcoreSeconds = 0;
|
|
|
+ long guaranteedMemorySeconds = 0;
|
|
|
+ long guaranteedVcoreSeconds = 0;
|
|
|
+ long opportunisticMemorySeconds = 0;
|
|
|
+ long opportunisticVcoreSeconds = 0;
|
|
|
|
|
|
// Calculate container usage metrics for first attempt.
|
|
|
if (keepRunningContainers) {
|
|
@@ -321,8 +346,10 @@ public class TestContainerResourceUsage {
|
|
|
for (RMContainer c : rmContainers) {
|
|
|
if (c.getContainerId().equals(amContainerId)) {
|
|
|
AggregateAppResourceUsage ru = calculateContainerResourceMetrics(c);
|
|
|
- memorySeconds += ru.getMemorySeconds();
|
|
|
- vcoreSeconds += ru.getVcoreSeconds();
|
|
|
+ guaranteedMemorySeconds += ru.getGuaranteedMemorySeconds();
|
|
|
+ guaranteedVcoreSeconds += ru.getGuaranteedVcoreSeconds();
|
|
|
+ opportunisticVcoreSeconds += ru.getOpportunisticVcoreSeconds();
|
|
|
+ opportunisticMemorySeconds += ru.getOpportunisticMemorySeconds();
|
|
|
} else {
|
|
|
// The remaining container should be RUNNING.
|
|
|
Assert.assertTrue("After first attempt failed, remaining container "
|
|
@@ -336,8 +363,10 @@ public class TestContainerResourceUsage {
|
|
|
for (RMContainer c : rmContainers) {
|
|
|
waitforContainerCompletion(rm, nm, amContainerId, c);
|
|
|
AggregateAppResourceUsage ru = calculateContainerResourceMetrics(c);
|
|
|
- memorySeconds += ru.getMemorySeconds();
|
|
|
- vcoreSeconds += ru.getVcoreSeconds();
|
|
|
+ guaranteedMemorySeconds += ru.getGuaranteedMemorySeconds();
|
|
|
+ guaranteedVcoreSeconds += ru.getGuaranteedVcoreSeconds();
|
|
|
+ opportunisticMemorySeconds += ru.getOpportunisticMemorySeconds();
|
|
|
+ opportunisticVcoreSeconds += ru.getOpportunisticVcoreSeconds();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -355,14 +384,12 @@ public class TestContainerResourceUsage {
|
|
|
am1.registerAppAttempt();
|
|
|
rm.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
|
|
|
// allocate NUM_CONTAINERS containers
|
|
|
- am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS,
|
|
|
- new ArrayList<ContainerId>());
|
|
|
+ am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS, new ArrayList<>());
|
|
|
nm.nodeHeartbeat(true);
|
|
|
|
|
|
// wait for containers to be allocated.
|
|
|
- containers =
|
|
|
- am1.allocate(new ArrayList<ResourceRequest>(),
|
|
|
- new ArrayList<ContainerId>()).getAllocatedContainers();
|
|
|
+ containers = am1.allocate(new ArrayList<>(), new ArrayList<>())
|
|
|
+ .getAllocatedContainers();
|
|
|
while (containers.size() != NUM_CONTAINERS) {
|
|
|
nm.nodeHeartbeat(true);
|
|
|
containers.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
|
|
@@ -388,19 +415,26 @@ public class TestContainerResourceUsage {
|
|
|
for (RMContainer c : rmContainers) {
|
|
|
waitforContainerCompletion(rm, nm, amContainerId, c);
|
|
|
AggregateAppResourceUsage ru = calculateContainerResourceMetrics(c);
|
|
|
- memorySeconds += ru.getMemorySeconds();
|
|
|
- vcoreSeconds += ru.getVcoreSeconds();
|
|
|
+ guaranteedMemorySeconds += ru.getGuaranteedMemorySeconds();
|
|
|
+ guaranteedVcoreSeconds += ru.getGuaranteedVcoreSeconds();
|
|
|
+ opportunisticMemorySeconds += ru.getOpportunisticMemorySeconds();
|
|
|
+ opportunisticVcoreSeconds += ru.getOpportunisticVcoreSeconds();
|
|
|
}
|
|
|
|
|
|
RMAppMetrics rmAppMetrics = app.getRMAppMetrics();
|
|
|
|
|
|
- Assert.assertEquals("Unexpected MemorySeconds value",
|
|
|
- memorySeconds, rmAppMetrics.getMemorySeconds());
|
|
|
- Assert.assertEquals("Unexpected VcoreSeconds value",
|
|
|
- vcoreSeconds, rmAppMetrics.getVcoreSeconds());
|
|
|
+ Assert.assertEquals("Unexpected GuaranteedMemorySeconds value",
|
|
|
+ guaranteedMemorySeconds, rmAppMetrics.getGuaranteedMemorySeconds());
|
|
|
+ Assert.assertEquals("Unexpected GuaranteedVcoreSeconds value",
|
|
|
+ guaranteedVcoreSeconds, rmAppMetrics.getGuaranteedVcoreSeconds());
|
|
|
+ Assert.assertEquals("Unexpected OpportunisticMemorySeconds value",
|
|
|
+ opportunisticMemorySeconds,
|
|
|
+ rmAppMetrics.getOpportunisticMemorySeconds());
|
|
|
+ Assert.assertEquals("Unexpected OpportunisticVcoreSeconds value",
|
|
|
+ opportunisticVcoreSeconds,
|
|
|
+ rmAppMetrics.getOpportunisticVcoreSeconds());
|
|
|
|
|
|
rm.stop();
|
|
|
- return;
|
|
|
}
|
|
|
|
|
|
private void waitforContainerCompletion(MockRM rm, MockNM nm,
|
|
@@ -419,16 +453,10 @@ public class TestContainerResourceUsage {
|
|
|
|
|
|
private AggregateAppResourceUsage calculateContainerResourceMetrics(
|
|
|
RMContainer rmContainer) {
|
|
|
- Resource resource = rmContainer.getContainer().getResource();
|
|
|
- long usedMillis =
|
|
|
- rmContainer.getFinishTime() - rmContainer.getCreationTime();
|
|
|
- long memorySeconds = resource.getMemorySize()
|
|
|
- * usedMillis / DateUtils.MILLIS_PER_SECOND;
|
|
|
- long vcoreSeconds = resource.getVirtualCores()
|
|
|
- * usedMillis / DateUtils.MILLIS_PER_SECOND;
|
|
|
- Map<String, Long> map = new HashMap<>();
|
|
|
- map.put(ResourceInformation.MEMORY_MB.getName(), memorySeconds);
|
|
|
- map.put(ResourceInformation.VCORES.getName(), vcoreSeconds);
|
|
|
- return new AggregateAppResourceUsage(map);
|
|
|
+ ContainerResourceUsageReport resourceUsageReport =
|
|
|
+ rmContainer.getResourceUsageReport();
|
|
|
+ return new AggregateAppResourceUsage(
|
|
|
+ resourceUsageReport.getGuaranteedResourceUsageSecondsMap(),
|
|
|
+ resourceUsageReport.getOpportunisticResourceSecondsMap());
|
|
|
}
|
|
|
}
|