|
@@ -26,6 +26,7 @@ import java.net.URISyntaxException;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.HashMap;
|
|
|
+import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Vector;
|
|
@@ -43,10 +44,14 @@ import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
-import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
|
|
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
|
|
+import org.apache.hadoop.io.DataOutputBuffer;
|
|
|
+import org.apache.hadoop.security.Credentials;
|
|
|
+import org.apache.hadoop.security.UserGroupInformation;
|
|
|
+import org.apache.hadoop.security.token.Token;
|
|
|
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
|
|
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
|
|
|
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
|
|
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
|
@@ -73,6 +78,7 @@ import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
|
|
|
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
|
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
|
|
import org.apache.hadoop.yarn.util.Records;
|
|
|
|
|
@@ -147,7 +153,7 @@ public class ApplicationMaster {
|
|
|
|
|
|
// Handle to communicate with the Resource Manager
|
|
|
@SuppressWarnings("rawtypes")
|
|
|
- private AMRMClientAsync resourceManager;
|
|
|
+ private AMRMClientAsync amRMClient;
|
|
|
|
|
|
// Handle to communicate with the Node Manager
|
|
|
private NMClientAsync nmClientAsync;
|
|
@@ -206,7 +212,9 @@ public class ApplicationMaster {
|
|
|
|
|
|
private volatile boolean done;
|
|
|
private volatile boolean success;
|
|
|
-
|
|
|
+
|
|
|
+ private ByteBuffer allTokens;
|
|
|
+
|
|
|
// Launch threads
|
|
|
private List<Thread> launchThreads = new ArrayList<Thread>();
|
|
|
|
|
@@ -441,11 +449,24 @@ public class ApplicationMaster {
|
|
|
public boolean run() throws YarnException, IOException {
|
|
|
LOG.info("Starting ApplicationMaster");
|
|
|
|
|
|
+ Credentials credentials =
|
|
|
+ UserGroupInformation.getCurrentUser().getCredentials();
|
|
|
+ DataOutputBuffer dob = new DataOutputBuffer();
|
|
|
+ credentials.writeTokenStorageToStream(dob);
|
|
|
+ // Now remove the AM->RM token so that containers cannot access it.
|
|
|
+ Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
|
|
|
+ while (iter.hasNext()) {
|
|
|
+ Token<?> token = iter.next();
|
|
|
+ if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
|
|
|
+ iter.remove();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
|
|
+
|
|
|
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
|
|
|
- resourceManager =
|
|
|
- AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
|
|
|
- resourceManager.init(conf);
|
|
|
- resourceManager.start();
|
|
|
+ amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
|
|
|
+ amRMClient.init(conf);
|
|
|
+ amRMClient.start();
|
|
|
|
|
|
containerListener = new NMCallbackHandler();
|
|
|
nmClientAsync = new NMClientAsyncImpl(containerListener);
|
|
@@ -460,7 +481,7 @@ public class ApplicationMaster {
|
|
|
|
|
|
// Register self with ResourceManager
|
|
|
// This will start heartbeating to the RM
|
|
|
- RegisterApplicationMasterResponse response = resourceManager
|
|
|
+ RegisterApplicationMasterResponse response = amRMClient
|
|
|
.registerApplicationMaster(appMasterHostname, appMasterRpcPort,
|
|
|
appMasterTrackingUrl);
|
|
|
// Dump out information about cluster capability as seen by the
|
|
@@ -485,7 +506,7 @@ public class ApplicationMaster {
|
|
|
// executed on them ( regardless of success/failure).
|
|
|
for (int i = 0; i < numTotalContainers; ++i) {
|
|
|
ContainerRequest containerAsk = setupContainerAskForRM();
|
|
|
- resourceManager.addContainerRequest(containerAsk);
|
|
|
+ amRMClient.addContainerRequest(containerAsk);
|
|
|
}
|
|
|
numRequestedContainers.set(numTotalContainers);
|
|
|
|
|
@@ -535,7 +556,7 @@ public class ApplicationMaster {
|
|
|
success = false;
|
|
|
}
|
|
|
try {
|
|
|
- resourceManager.unregisterApplicationMaster(appStatus, appMessage, null);
|
|
|
+ amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
|
|
|
} catch (YarnException ex) {
|
|
|
LOG.error("Failed to unregister application", ex);
|
|
|
} catch (IOException e) {
|
|
@@ -543,7 +564,7 @@ public class ApplicationMaster {
|
|
|
}
|
|
|
|
|
|
done = true;
|
|
|
- resourceManager.stop();
|
|
|
+ amRMClient.stop();
|
|
|
}
|
|
|
|
|
|
private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
|
|
@@ -595,7 +616,7 @@ public class ApplicationMaster {
|
|
|
if (askCount > 0) {
|
|
|
for (int i = 0; i < askCount; ++i) {
|
|
|
ContainerRequest containerAsk = setupContainerAskForRM();
|
|
|
- resourceManager.addContainerRequest(containerAsk);
|
|
|
+ amRMClient.addContainerRequest(containerAsk);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -651,7 +672,7 @@ public class ApplicationMaster {
|
|
|
@Override
|
|
|
public void onError(Throwable e) {
|
|
|
done = true;
|
|
|
- resourceManager.stop();
|
|
|
+ amRMClient.stop();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -807,6 +828,14 @@ public class ApplicationMaster {
|
|
|
commands.add(command.toString());
|
|
|
ctx.setCommands(commands);
|
|
|
|
|
|
+ // Set up tokens for the container too. Today, for normal shell commands,
|
|
|
+ // the container in distribute-shell doesn't need any tokens. We are
|
|
|
+ // populating them mainly for NodeManagers to be able to download any
|
|
|
+ // files in the distributed file-system. The tokens are otherwise also
|
|
|
+ // useful in cases, for e.g., when one is running a "hadoop dfs" command
|
|
|
+ // inside the distributed shell.
|
|
|
+ ctx.setTokens(allTokens);
|
|
|
+
|
|
|
containerListener.addContainer(container.getId(), container);
|
|
|
nmClientAsync.startContainerAsync(container, ctx);
|
|
|
}
|