|
@@ -33,6 +33,7 @@ import java.util.Map;
|
|
import java.util.concurrent.ConcurrentMap;
|
|
import java.util.concurrent.ConcurrentMap;
|
|
import java.util.concurrent.ConcurrentSkipListMap;
|
|
import java.util.concurrent.ConcurrentSkipListMap;
|
|
import java.util.concurrent.CyclicBarrier;
|
|
import java.util.concurrent.CyclicBarrier;
|
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
@@ -78,8 +79,8 @@ import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecret
|
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
|
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
|
|
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
|
|
-import org.apache.hadoop.yarn.service.Service;
|
|
|
|
import org.apache.hadoop.yarn.service.Service.STATE;
|
|
import org.apache.hadoop.yarn.service.Service.STATE;
|
|
|
|
+import org.apache.hadoop.yarn.service.ServiceOperations;
|
|
import org.junit.After;
|
|
import org.junit.After;
|
|
import org.junit.Assert;
|
|
import org.junit.Assert;
|
|
import org.junit.Before;
|
|
import org.junit.Before;
|
|
@@ -110,9 +111,7 @@ public class TestNodeStatusUpdater {
|
|
public void tearDown() {
|
|
public void tearDown() {
|
|
this.registeredNodes.clear();
|
|
this.registeredNodes.clear();
|
|
heartBeatID = 0;
|
|
heartBeatID = 0;
|
|
- if (nm != null && nm.getServiceState() == STATE.STARTED) {
|
|
|
|
- nm.stop();
|
|
|
|
- }
|
|
|
|
|
|
+ ServiceOperations.stop(nm);
|
|
DefaultMetricsSystem.shutdown();
|
|
DefaultMetricsSystem.shutdown();
|
|
}
|
|
}
|
|
|
|
|
|
@@ -316,9 +315,11 @@ public class TestNodeStatusUpdater {
|
|
public ResourceTracker resourceTracker =
|
|
public ResourceTracker resourceTracker =
|
|
new MyResourceTracker(this.context);
|
|
new MyResourceTracker(this.context);
|
|
private Context context;
|
|
private Context context;
|
|
- private final long waitStartTime;
|
|
|
|
|
|
+ private long waitStartTime;
|
|
private final long rmStartIntervalMS;
|
|
private final long rmStartIntervalMS;
|
|
private final boolean rmNeverStart;
|
|
private final boolean rmNeverStart;
|
|
|
|
+ private volatile boolean triggered = false;
|
|
|
|
+ private long durationWhenTriggered = -1;
|
|
|
|
|
|
public MyNodeStatusUpdater4(Context context, Dispatcher dispatcher,
|
|
public MyNodeStatusUpdater4(Context context, Dispatcher dispatcher,
|
|
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
|
|
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
|
|
@@ -330,15 +331,51 @@ public class TestNodeStatusUpdater {
|
|
this.rmNeverStart = rmNeverStart;
|
|
this.rmNeverStart = rmNeverStart;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Override
|
|
|
|
+ protected void serviceStart() throws Exception {
|
|
|
|
+ //record the startup time
|
|
|
|
+ this.waitStartTime = System.currentTimeMillis();
|
|
|
|
+ super.serviceStart();
|
|
|
|
+ }
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
protected ResourceTracker getRMClient() {
|
|
protected ResourceTracker getRMClient() {
|
|
- if(System.currentTimeMillis() - waitStartTime <= rmStartIntervalMS
|
|
|
|
- || rmNeverStart) {
|
|
|
|
- throw new YarnRuntimeException("Faking RM start failure as start " +
|
|
|
|
- "delay timer has not expired.");
|
|
|
|
- } else {
|
|
|
|
- return resourceTracker;
|
|
|
|
|
|
+ if (!triggered) {
|
|
|
|
+ long t = System.currentTimeMillis();
|
|
|
|
+ long duration = t - waitStartTime;
|
|
|
|
+ if (duration <= rmStartIntervalMS
|
|
|
|
+ || rmNeverStart) {
|
|
|
|
+ throw new YarnRuntimeException("Faking RM start failure as start " +
|
|
|
|
+ "delay timer has not expired.");
|
|
|
|
+ } else {
|
|
|
|
+ //triggering
|
|
|
|
+ triggered = true;
|
|
|
|
+ durationWhenTriggered = duration;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
+ return resourceTracker;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private boolean isTriggered() {
|
|
|
|
+ return triggered;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private long getWaitStartTime() {
|
|
|
|
+ return waitStartTime;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private long getDurationWhenTriggered() {
|
|
|
|
+ return durationWhenTriggered;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public String toString() {
|
|
|
|
+ return "MyNodeStatusUpdater4{" +
|
|
|
|
+ "rmNeverStart=" + rmNeverStart +
|
|
|
|
+ ", triggered=" + triggered +
|
|
|
|
+ ", duration=" + durationWhenTriggered +
|
|
|
|
+ ", rmStartIntervalMS=" + rmStartIntervalMS +
|
|
|
|
+ '}';
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -390,13 +427,10 @@ public class TestNodeStatusUpdater {
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
- public void stop() {
|
|
|
|
- super.stop();
|
|
|
|
|
|
+ protected void serviceStop() throws Exception {
|
|
|
|
+ super.serviceStop();
|
|
isStopped = true;
|
|
isStopped = true;
|
|
- try {
|
|
|
|
- syncBarrier.await();
|
|
|
|
- } catch (Exception e) {
|
|
|
|
- }
|
|
|
|
|
|
+ syncBarrier.await(10000, TimeUnit.MILLISECONDS);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
//
|
|
//
|
|
@@ -580,7 +614,9 @@ public class TestNodeStatusUpdater {
|
|
nodeStatus.setResponseId(heartBeatID);
|
|
nodeStatus.setResponseId(heartBeatID);
|
|
NodeHeartbeatResponse nhResponse =
|
|
NodeHeartbeatResponse nhResponse =
|
|
YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID,
|
|
YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID,
|
|
- heartBeatNodeAction, null, null, null, 1000L);
|
|
|
|
|
|
+ heartBeatNodeAction,
|
|
|
|
+ null, null, null,
|
|
|
|
+ 1000L);
|
|
return nhResponse;
|
|
return nhResponse;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -724,7 +760,7 @@ public class TestNodeStatusUpdater {
|
|
Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
|
|
Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
|
|
Assert.assertEquals(numCleanups.get(), 1);
|
|
Assert.assertEquals(numCleanups.get(), 1);
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
@Test
|
|
@Test
|
|
public void testNodeDecommision() throws Exception {
|
|
public void testNodeDecommision() throws Exception {
|
|
nm = getNodeManager(NodeAction.SHUTDOWN);
|
|
nm = getNodeManager(NodeAction.SHUTDOWN);
|
|
@@ -749,12 +785,35 @@ public class TestNodeStatusUpdater {
|
|
Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
|
|
Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private abstract class NodeManagerWithCustomNodeStatusUpdater extends NodeManager {
|
|
|
|
+ private NodeStatusUpdater updater;
|
|
|
|
+
|
|
|
|
+ private NodeManagerWithCustomNodeStatusUpdater() {
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
|
|
|
|
+ Dispatcher dispatcher,
|
|
|
|
+ NodeHealthCheckerService healthChecker) {
|
|
|
|
+ updater = createUpdater(context, dispatcher, healthChecker);
|
|
|
|
+ return updater;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public NodeStatusUpdater getUpdater() {
|
|
|
|
+ return updater;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ abstract NodeStatusUpdater createUpdater(Context context,
|
|
|
|
+ Dispatcher dispatcher,
|
|
|
|
+ NodeHealthCheckerService healthChecker);
|
|
|
|
+ }
|
|
|
|
+
|
|
@Test
|
|
@Test
|
|
- public void testNMShutdownForRegistrationFailure() {
|
|
|
|
|
|
+ public void testNMShutdownForRegistrationFailure() throws Exception {
|
|
|
|
|
|
- nm = new NodeManager() {
|
|
|
|
|
|
+ nm = new NodeManagerWithCustomNodeStatusUpdater() {
|
|
@Override
|
|
@Override
|
|
- protected NodeStatusUpdater createNodeStatusUpdater(Context context,
|
|
|
|
|
|
+ protected NodeStatusUpdater createUpdater(Context context,
|
|
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
|
|
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
|
|
MyNodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater(
|
|
MyNodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater(
|
|
context, dispatcher, healthChecker, metrics);
|
|
context, dispatcher, healthChecker, metrics);
|
|
@@ -765,14 +824,14 @@ public class TestNodeStatusUpdater {
|
|
return nodeStatusUpdater;
|
|
return nodeStatusUpdater;
|
|
}
|
|
}
|
|
};
|
|
};
|
|
- verifyNodeStartFailure("org.apache.hadoop.yarn.YarnRuntimeException: "
|
|
|
|
- + "Recieved SHUTDOWN signal from Resourcemanager ,"
|
|
|
|
|
|
+ verifyNodeStartFailure(
|
|
|
|
+ "Recieved SHUTDOWN signal from Resourcemanager ,"
|
|
+ "Registration of NodeManager failed, "
|
|
+ "Registration of NodeManager failed, "
|
|
+ "Message from ResourceManager: RM Shutting Down Node");
|
|
+ "Message from ResourceManager: RM Shutting Down Node");
|
|
}
|
|
}
|
|
|
|
|
|
@Test (timeout = 150000)
|
|
@Test (timeout = 150000)
|
|
- public void testNMConnectionToRM() {
|
|
|
|
|
|
+ public void testNMConnectionToRM() throws Exception {
|
|
final long delta = 50000;
|
|
final long delta = 50000;
|
|
final long connectionWaitSecs = 5;
|
|
final long connectionWaitSecs = 5;
|
|
final long connectionRetryIntervalSecs = 1;
|
|
final long connectionRetryIntervalSecs = 1;
|
|
@@ -786,9 +845,10 @@ public class TestNodeStatusUpdater {
|
|
connectionRetryIntervalSecs);
|
|
connectionRetryIntervalSecs);
|
|
|
|
|
|
//Test NM try to connect to RM Several times, but finally fail
|
|
//Test NM try to connect to RM Several times, but finally fail
|
|
- nm = new NodeManager() {
|
|
|
|
|
|
+ NodeManagerWithCustomNodeStatusUpdater nmWithUpdater;
|
|
|
|
+ nm = nmWithUpdater = new NodeManagerWithCustomNodeStatusUpdater() {
|
|
@Override
|
|
@Override
|
|
- protected NodeStatusUpdater createNodeStatusUpdater(Context context,
|
|
|
|
|
|
+ protected NodeStatusUpdater createUpdater(Context context,
|
|
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
|
|
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
|
|
NodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater4(
|
|
NodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater4(
|
|
context, dispatcher, healthChecker, metrics,
|
|
context, dispatcher, healthChecker, metrics,
|
|
@@ -802,19 +862,25 @@ public class TestNodeStatusUpdater {
|
|
nm.start();
|
|
nm.start();
|
|
Assert.fail("NM should have failed to start due to RM connect failure");
|
|
Assert.fail("NM should have failed to start due to RM connect failure");
|
|
} catch(Exception e) {
|
|
} catch(Exception e) {
|
|
- Assert.assertTrue("NM should have tried re-connecting to RM during " +
|
|
|
|
|
|
+ long t = System.currentTimeMillis();
|
|
|
|
+ long duration = t - waitStartTime;
|
|
|
|
+ boolean waitTimeValid = (duration >= connectionWaitSecs * 1000)
|
|
|
|
+ && (duration < (connectionWaitSecs * 1000 + delta));
|
|
|
|
+ if(!waitTimeValid) {
|
|
|
|
+ //either the exception was too early, or it had a different cause.
|
|
|
|
+ //reject with the inner stack trace
|
|
|
|
+ throw new Exception("NM should have tried re-connecting to RM during " +
|
|
"period of at least " + connectionWaitSecs + " seconds, but " +
|
|
"period of at least " + connectionWaitSecs + " seconds, but " +
|
|
"stopped retrying within " + (connectionWaitSecs + delta/1000) +
|
|
"stopped retrying within " + (connectionWaitSecs + delta/1000) +
|
|
- " seconds", (System.currentTimeMillis() - waitStartTime
|
|
|
|
- >= connectionWaitSecs*1000) && (System.currentTimeMillis()
|
|
|
|
- - waitStartTime < (connectionWaitSecs*1000+delta)));
|
|
|
|
|
|
+ " seconds: " + e, e);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
//Test NM connect to RM, fail at first several attempts,
|
|
//Test NM connect to RM, fail at first several attempts,
|
|
//but finally success.
|
|
//but finally success.
|
|
- nm = new NodeManager() {
|
|
|
|
|
|
+ nm = nmWithUpdater = new NodeManagerWithCustomNodeStatusUpdater() {
|
|
@Override
|
|
@Override
|
|
- protected NodeStatusUpdater createNodeStatusUpdater(Context context,
|
|
|
|
|
|
+ protected NodeStatusUpdater createUpdater(Context context,
|
|
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
|
|
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
|
|
NodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater4(
|
|
NodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater4(
|
|
context, dispatcher, healthChecker, metrics, rmStartIntervalMS,
|
|
context, dispatcher, healthChecker, metrics, rmStartIntervalMS,
|
|
@@ -822,20 +888,33 @@ public class TestNodeStatusUpdater {
|
|
return nodeStatusUpdater;
|
|
return nodeStatusUpdater;
|
|
}
|
|
}
|
|
};
|
|
};
|
|
-
|
|
|
|
nm.init(conf);
|
|
nm.init(conf);
|
|
|
|
+ NodeStatusUpdater updater = nmWithUpdater.getUpdater();
|
|
|
|
+ Assert.assertNotNull("Updater not yet created ", updater);
|
|
waitStartTime = System.currentTimeMillis();
|
|
waitStartTime = System.currentTimeMillis();
|
|
try {
|
|
try {
|
|
nm.start();
|
|
nm.start();
|
|
} catch (Exception ex){
|
|
} catch (Exception ex){
|
|
- Assert.fail("NM should have started successfully " +
|
|
|
|
- "after connecting to RM.");
|
|
|
|
|
|
+ LOG.error("NM should have started successfully " +
|
|
|
|
+ "after connecting to RM.", ex);
|
|
|
|
+ throw ex;
|
|
}
|
|
}
|
|
- Assert.assertTrue("NM should have connected to RM within " + delta/1000
|
|
|
|
- +" seconds of RM starting up.",
|
|
|
|
- (System.currentTimeMillis() - waitStartTime >= rmStartIntervalMS)
|
|
|
|
- && (System.currentTimeMillis() - waitStartTime
|
|
|
|
- < (rmStartIntervalMS+delta)));
|
|
|
|
|
|
+ long duration = System.currentTimeMillis() - waitStartTime;
|
|
|
|
+ MyNodeStatusUpdater4 myUpdater = (MyNodeStatusUpdater4) updater;
|
|
|
|
+ Assert.assertTrue("Updater was never started",
|
|
|
|
+ myUpdater.getWaitStartTime()>0);
|
|
|
|
+ Assert.assertTrue("NM started before updater triggered",
|
|
|
|
+ myUpdater.isTriggered());
|
|
|
|
+ Assert.assertTrue("NM should have connected to RM after "
|
|
|
|
+ +"the start interval of " + rmStartIntervalMS
|
|
|
|
+ +": actual " + duration
|
|
|
|
+ + " " + myUpdater,
|
|
|
|
+ (duration >= rmStartIntervalMS));
|
|
|
|
+ Assert.assertTrue("NM should have connected to RM less than "
|
|
|
|
+ + (rmStartIntervalMS + delta)
|
|
|
|
+ +" milliseconds of RM starting up: actual " + duration
|
|
|
|
+ + " " + myUpdater,
|
|
|
|
+ (duration < (rmStartIntervalMS + delta)));
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -846,7 +925,7 @@ public class TestNodeStatusUpdater {
|
|
* only after NM_EXPIRY interval. See MAPREDUCE-2749.
|
|
* only after NM_EXPIRY interval. See MAPREDUCE-2749.
|
|
*/
|
|
*/
|
|
@Test
|
|
@Test
|
|
- public void testNoRegistrationWhenNMServicesFail() {
|
|
|
|
|
|
+ public void testNoRegistrationWhenNMServicesFail() throws Exception {
|
|
|
|
|
|
nm = new NodeManager() {
|
|
nm = new NodeManager() {
|
|
@Override
|
|
@Override
|
|
@@ -865,7 +944,7 @@ public class TestNodeStatusUpdater {
|
|
return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
|
|
return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
|
|
metrics, aclsManager, diskhandler) {
|
|
metrics, aclsManager, diskhandler) {
|
|
@Override
|
|
@Override
|
|
- public void start() {
|
|
|
|
|
|
+ protected void serviceStart() {
|
|
// Simulating failure of starting RPC server
|
|
// Simulating failure of starting RPC server
|
|
throw new YarnRuntimeException("Starting of RPC Server failed");
|
|
throw new YarnRuntimeException("Starting of RPC Server failed");
|
|
}
|
|
}
|
|
@@ -961,7 +1040,7 @@ public class TestNodeStatusUpdater {
|
|
nm.init(conf);
|
|
nm.init(conf);
|
|
nm.start();
|
|
nm.start();
|
|
try {
|
|
try {
|
|
- syncBarrier.await();
|
|
|
|
|
|
+ syncBarrier.await(10000, TimeUnit.MILLISECONDS);
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
}
|
|
}
|
|
Assert.assertTrue(((MyNodeManager2) nm).isStopped);
|
|
Assert.assertTrue(((MyNodeManager2) nm).isStopped);
|
|
@@ -1053,20 +1132,25 @@ public class TestNodeStatusUpdater {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private void verifyNodeStartFailure(String errMessage) {
|
|
|
|
|
|
+ private void verifyNodeStartFailure(String errMessage) throws Exception {
|
|
|
|
+ Assert.assertNotNull("nm is null", nm);
|
|
YarnConfiguration conf = createNMConfig();
|
|
YarnConfiguration conf = createNMConfig();
|
|
nm.init(conf);
|
|
nm.init(conf);
|
|
try {
|
|
try {
|
|
nm.start();
|
|
nm.start();
|
|
Assert.fail("NM should have failed to start. Didn't get exception!!");
|
|
Assert.fail("NM should have failed to start. Didn't get exception!!");
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
- Assert.assertEquals(errMessage, e.getCause()
|
|
|
|
- .getMessage());
|
|
|
|
|
|
+ //the version in trunk looked in the cause for equality
|
|
|
|
+ // and assumed failures were nested.
|
|
|
|
+ //this version assumes that error strings propagate to the base and
|
|
|
|
+ //use a contains() test only. It should be less brittle
|
|
|
|
+ if(!e.getMessage().contains(errMessage)) {
|
|
|
|
+ throw e;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
- // the state change to stopped occurs only if the startup is success, else
|
|
|
|
- // state change doesn't occur
|
|
|
|
- Assert.assertEquals("NM state is wrong!", Service.STATE.INITED, nm
|
|
|
|
|
|
+ // the service should be stopped
|
|
|
|
+ Assert.assertEquals("NM state is wrong!", STATE.STOPPED, nm
|
|
.getServiceState());
|
|
.getServiceState());
|
|
|
|
|
|
Assert.assertEquals("Number of registered nodes is wrong!", 0,
|
|
Assert.assertEquals("Number of registered nodes is wrong!", 0,
|