浏览代码

YARN-72. Forgot to add 2 files to branch-0.23

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1469056 13f79535-47bb-0310-9956-ffa450edef68
Thomas Graves 12 年之前
父节点
当前提交
0f60da7893

+ 92 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java

@@ -0,0 +1,92 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
+import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
+import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+
+/**
+ * This class allows a node manager to run without without communicating with a
+ * real RM.
+ */
+public class MockNodeStatusUpdater extends NodeStatusUpdaterImpl {
+  static final Log LOG = LogFactory.getLog(MockNodeStatusUpdater.class);
+  
+  private static final RecordFactory recordFactory = RecordFactoryProvider
+      .getRecordFactory(null);
+
+  private ResourceTracker resourceTracker;
+
+  public MockNodeStatusUpdater(Context context, Dispatcher dispatcher,
+      NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
+    super(context, dispatcher, healthChecker, metrics);
+    resourceTracker = new MockResourceTracker();
+  }
+
+  @Override
+  protected ResourceTracker getRMClient() {
+    return resourceTracker;
+  }
+  
+  private static class MockResourceTracker implements ResourceTracker {
+    private int heartBeatID;
+
+    @Override
+    public RegisterNodeManagerResponse registerNodeManager(
+        RegisterNodeManagerRequest request) throws YarnRemoteException {
+      RegistrationResponse regResponse = recordFactory
+          .newRecordInstance(RegistrationResponse.class);
+
+      RegisterNodeManagerResponse response = recordFactory
+          .newRecordInstance(RegisterNodeManagerResponse.class);
+      response.setRegistrationResponse(regResponse);
+      return response;
+    }
+
+    @Override
+    public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
+        throws YarnRemoteException {
+      NodeStatus nodeStatus = request.getNodeStatus();
+      LOG.info("Got heartbeat number " + heartBeatID);
+      nodeStatus.setResponseId(heartBeatID++);
+
+      HeartbeatResponse response = recordFactory
+          .newRecordInstance(HeartbeatResponse.class);
+      response.setResponseId(heartBeatID);
+
+      NodeHeartbeatResponse nhResponse = recordFactory
+          .newRecordInstance(NodeHeartbeatResponse.class);
+      nhResponse.setHeartbeatResponse(response);
+      return nhResponse;
+    }
+  }
+}

+ 222 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java

@@ -0,0 +1,222 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestNodeManagerShutdown {
+  static final File basedir =
+      new File("target", TestNodeManagerShutdown.class.getName());
+  static final File tmpDir = new File(basedir, "tmpDir");
+  static final File logsDir = new File(basedir, "logs");
+  static final File remoteLogsDir = new File(basedir, "remotelogs");
+  static final File nmLocalDir = new File(basedir, "nm0");
+  static final File processStartFile = new File(tmpDir, "start_file.txt")
+    .getAbsoluteFile();
+
+  static final RecordFactory recordFactory = RecordFactoryProvider
+      .getRecordFactory(null);
+  static final String user = "nobody";
+  private FileContext localFS;
+
+  @Before
+  public void setup() throws UnsupportedFileSystemException {
+    localFS = FileContext.getLocalFSFileContext();
+    tmpDir.mkdirs();
+    logsDir.mkdirs();
+    remoteLogsDir.mkdirs();
+    nmLocalDir.mkdirs();
+  }
+  
+  @After
+  public void tearDown() throws IOException, InterruptedException {
+    localFS.delete(new Path(basedir.getPath()), true);
+  }
+  
+  @Test
+  public void testKillContainersOnShutdown() throws IOException {
+    NodeManager nm = getNodeManager();
+    nm.init(createNMConfig());
+    nm.start();
+    
+    ContainerManagerImpl containerManager = nm.getContainerManager();
+    File scriptFile = createUnhaltingScriptFile();
+    
+    ContainerLaunchContext containerLaunchContext = 
+        recordFactory.newRecordInstance(ContainerLaunchContext.class);
+
+    // Construct the Container-id
+    ContainerId cId = createContainerId();
+    containerLaunchContext.setContainerId(cId);
+
+    containerLaunchContext.setUser(user);
+
+    URL localResourceUri =
+        ConverterUtils.getYarnUrlFromPath(localFS
+            .makeQualified(new Path(scriptFile.getAbsolutePath())));
+    LocalResource localResource =
+        recordFactory.newRecordInstance(LocalResource.class);
+    localResource.setResource(localResourceUri);
+    localResource.setSize(-1);
+    localResource.setVisibility(LocalResourceVisibility.APPLICATION);
+    localResource.setType(LocalResourceType.FILE);
+    localResource.setTimestamp(scriptFile.lastModified());
+    String destinationFile = "dest_file";
+    Map<String, LocalResource> localResources = 
+        new HashMap<String, LocalResource>();
+    localResources.put(destinationFile, localResource);
+    containerLaunchContext.setLocalResources(localResources);
+    containerLaunchContext.setUser(containerLaunchContext.getUser());
+    List<String> commands = new ArrayList<String>();
+    commands.add("/bin/bash");
+    commands.add(scriptFile.getAbsolutePath());
+    containerLaunchContext.setCommands(commands);
+    containerLaunchContext.setResource(recordFactory
+        .newRecordInstance(Resource.class));
+    containerLaunchContext.getResource().setMemory(1024);
+    StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
+    startRequest.setContainerLaunchContext(containerLaunchContext);
+    containerManager.startContainer(startRequest);
+    
+    GetContainerStatusRequest request =
+        recordFactory.newRecordInstance(GetContainerStatusRequest.class);
+        request.setContainerId(cId);
+    ContainerStatus containerStatus =
+        containerManager.getContainerStatus(request).getStatus();
+    Assert.assertEquals(ContainerState.RUNNING, containerStatus.getState());
+    
+    try {Thread.sleep(5000);} catch (InterruptedException ex) {ex.printStackTrace();} 
+    
+    nm.stop();
+    
+    // Now verify the contents of the file
+    // Script generates a message when it receives a sigterm
+    // so we look for that
+    BufferedReader reader =
+        new BufferedReader(new FileReader(processStartFile));
+
+    boolean foundSigTermMessage = false;
+    while (true) {
+      String line = reader.readLine();
+      if (line == null) {
+        break;
+      }
+      if (line.contains("SIGTERM")) {
+        foundSigTermMessage = true;
+        break;
+      }
+    }
+    Assert.assertTrue("Did not find sigterm message", foundSigTermMessage);
+    reader.close();
+  }
+  
+  private ContainerId createContainerId() {
+    ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
+    appId.setClusterTimestamp(0);
+    appId.setId(0);
+    ApplicationAttemptId appAttemptId = 
+        recordFactory.newRecordInstance(ApplicationAttemptId.class);
+    appAttemptId.setApplicationId(appId);
+    appAttemptId.setAttemptId(1);
+    ContainerId containerId = 
+        recordFactory.newRecordInstance(ContainerId.class);
+    containerId.setApplicationAttemptId(appAttemptId);
+    return containerId;
+  }
+  
+  private YarnConfiguration createNMConfig() {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setInt(YarnConfiguration.NM_PMEM_MB, 5*1024); // 5GB
+    conf.set(YarnConfiguration.NM_ADDRESS, "127.0.0.1:12345");
+    conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "127.0.0.1:12346");
+    conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath());
+    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogsDir.getAbsolutePath());
+    conf.set(YarnConfiguration.NM_LOCAL_DIRS, nmLocalDir.getAbsolutePath());
+    return conf;
+  }
+  
+  /**
+   * Creates a script to run a container that will run forever unless
+   * stopped by external means.
+   */
+  private File createUnhaltingScriptFile() throws IOException {
+    File scriptFile = new File(tmpDir, "scriptFile.sh");
+    BufferedWriter fileWriter = new BufferedWriter(new FileWriter(scriptFile));
+    fileWriter.write("#!/bin/bash\n\n");
+    fileWriter.write("echo \"Running testscript for delayed kill\"\n");
+    fileWriter.write("hello=\"Got SIGTERM\"\n");
+    fileWriter.write("umask 0\n");
+    fileWriter.write("trap \"echo $hello >> " + processStartFile + "\" SIGTERM\n");
+    fileWriter.write("echo \"Writing pid to start file\"\n");
+    fileWriter.write("echo $$ >> " + processStartFile + "\n");
+    fileWriter.write("while true; do\nsleep 1s;\ndone\n");
+
+    fileWriter.close();
+    return scriptFile;
+  }
+
+  private NodeManager getNodeManager() {
+    return new NodeManager() {
+      @Override
+      protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+          Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+        MockNodeStatusUpdater myNodeStatusUpdater = new MockNodeStatusUpdater(
+            context, dispatcher, healthChecker, metrics);
+        return myNodeStatusUpdater;
+      }
+    };
+  }
+}