|
@@ -21,28 +21,51 @@ import static org.junit.Assert.*;
|
|
|
import static org.mockito.Mockito.*;
|
|
|
|
|
|
import java.io.File;
|
|
|
+import java.io.FileNotFoundException;
|
|
|
import java.io.IOException;
|
|
|
+import java.lang.reflect.Field;
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.Map;
|
|
|
+
|
|
|
+import junit.framework.Assert;
|
|
|
|
|
|
import org.apache.commons.io.FileUtils;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FileContext;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.io.Text;
|
|
|
+import org.apache.hadoop.mapred.JobConf;
|
|
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
|
+import org.apache.hadoop.mapreduce.OutputCommitter;
|
|
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEvent;
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
|
|
|
import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
|
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
|
|
+import org.apache.hadoop.security.AccessControlException;
|
|
|
+import org.apache.hadoop.security.Credentials;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
+import org.apache.hadoop.security.token.Token;
|
|
|
+import org.apache.hadoop.security.token.TokenIdentifier;
|
|
|
import org.apache.hadoop.yarn.YarnException;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
+import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
+import org.apache.hadoop.yarn.util.BuilderUtils;
|
|
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
|
|
+import org.apache.log4j.Level;
|
|
|
+import org.apache.log4j.LogManager;
|
|
|
+import org.apache.log4j.Logger;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.BeforeClass;
|
|
|
import org.junit.Test;
|
|
@@ -50,13 +73,20 @@ import org.junit.Test;
|
|
|
public class TestMRAppMaster {
|
|
|
private static final Log LOG = LogFactory.getLog(TestMRAppMaster.class);
|
|
|
static String stagingDir = "staging/";
|
|
|
+ private static FileContext localFS = null;
|
|
|
+ private static final File testDir = new File("target",
|
|
|
+ TestMRAppMaster.class.getName() + "-tmpDir").getAbsoluteFile();
|
|
|
|
|
|
@BeforeClass
|
|
|
- public static void setup() {
|
|
|
+ public static void setup() throws AccessControlException,
|
|
|
+ FileNotFoundException, IllegalArgumentException, IOException {
|
|
|
//Do not error out if metrics are inited multiple times
|
|
|
DefaultMetricsSystem.setMiniClusterMode(true);
|
|
|
File dir = new File(stagingDir);
|
|
|
stagingDir = dir.getAbsolutePath();
|
|
|
+ localFS = FileContext.getLocalFSFileContext();
|
|
|
+ localFS.delete(new Path(testDir.getAbsolutePath()), true);
|
|
|
+ testDir.mkdir();
|
|
|
}
|
|
|
|
|
|
@Before
|
|
@@ -81,7 +111,7 @@ public class TestMRAppMaster {
|
|
|
MRAppMasterTest appMaster =
|
|
|
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
|
|
|
System.currentTimeMillis());
|
|
|
- YarnConfiguration conf = new YarnConfiguration();
|
|
|
+ JobConf conf = new JobConf();
|
|
|
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
|
|
|
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
|
|
|
assertEquals(stagingDir + Path.SEPARATOR + userName + Path.SEPARATOR
|
|
@@ -94,7 +124,7 @@ public class TestMRAppMaster {
|
|
|
String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
|
|
|
String containerIdStr = "container_1317529182569_0004_000002_1";
|
|
|
String userName = "TestAppMasterUser";
|
|
|
- YarnConfiguration conf = new YarnConfiguration();
|
|
|
+ JobConf conf = new JobConf();
|
|
|
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
|
|
|
ApplicationAttemptId applicationAttemptId = ConverterUtils
|
|
|
.toApplicationAttemptId(applicationAttemptIdStr);
|
|
@@ -107,7 +137,7 @@ public class TestMRAppMaster {
|
|
|
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
|
|
|
MRAppMaster appMaster =
|
|
|
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
|
|
|
- System.currentTimeMillis(), false);
|
|
|
+ System.currentTimeMillis(), false, false);
|
|
|
boolean caught = false;
|
|
|
try {
|
|
|
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
|
|
@@ -128,7 +158,7 @@ public class TestMRAppMaster {
|
|
|
String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
|
|
|
String containerIdStr = "container_1317529182569_0004_000002_1";
|
|
|
String userName = "TestAppMasterUser";
|
|
|
- YarnConfiguration conf = new YarnConfiguration();
|
|
|
+ JobConf conf = new JobConf();
|
|
|
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
|
|
|
ApplicationAttemptId applicationAttemptId = ConverterUtils
|
|
|
.toApplicationAttemptId(applicationAttemptIdStr);
|
|
@@ -142,7 +172,7 @@ public class TestMRAppMaster {
|
|
|
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
|
|
|
MRAppMaster appMaster =
|
|
|
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
|
|
|
- System.currentTimeMillis(), false);
|
|
|
+ System.currentTimeMillis(), false, false);
|
|
|
boolean caught = false;
|
|
|
try {
|
|
|
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
|
|
@@ -163,7 +193,7 @@ public class TestMRAppMaster {
|
|
|
String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
|
|
|
String containerIdStr = "container_1317529182569_0004_000002_1";
|
|
|
String userName = "TestAppMasterUser";
|
|
|
- YarnConfiguration conf = new YarnConfiguration();
|
|
|
+ JobConf conf = new JobConf();
|
|
|
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
|
|
|
ApplicationAttemptId applicationAttemptId = ConverterUtils
|
|
|
.toApplicationAttemptId(applicationAttemptIdStr);
|
|
@@ -177,7 +207,7 @@ public class TestMRAppMaster {
|
|
|
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
|
|
|
MRAppMaster appMaster =
|
|
|
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
|
|
|
- System.currentTimeMillis(), false);
|
|
|
+ System.currentTimeMillis(), false, false);
|
|
|
boolean caught = false;
|
|
|
try {
|
|
|
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
|
|
@@ -198,7 +228,7 @@ public class TestMRAppMaster {
|
|
|
String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
|
|
|
String containerIdStr = "container_1317529182569_0004_000002_1";
|
|
|
String userName = "TestAppMasterUser";
|
|
|
- YarnConfiguration conf = new YarnConfiguration();
|
|
|
+ JobConf conf = new JobConf();
|
|
|
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
|
|
|
ApplicationAttemptId applicationAttemptId = ConverterUtils
|
|
|
.toApplicationAttemptId(applicationAttemptIdStr);
|
|
@@ -212,7 +242,7 @@ public class TestMRAppMaster {
|
|
|
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
|
|
|
MRAppMaster appMaster =
|
|
|
new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
|
|
|
- System.currentTimeMillis(), false);
|
|
|
+ System.currentTimeMillis(), false, false);
|
|
|
boolean caught = false;
|
|
|
try {
|
|
|
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
|
|
@@ -228,39 +258,155 @@ public class TestMRAppMaster {
|
|
|
assertEquals(JobStateInternal.ERROR, appMaster.forcedState);
|
|
|
appMaster.stop();
|
|
|
}
|
|
|
+
|
|
|
+ // A dirty hack to modify the env of the current JVM itself - Dirty, but
|
|
|
+ // should be okay for testing.
|
|
|
+ @SuppressWarnings({ "rawtypes", "unchecked" })
|
|
|
+ private static void setNewEnvironmentHack(Map<String, String> newenv)
|
|
|
+ throws Exception {
|
|
|
+ try {
|
|
|
+ Class<?> cl = Class.forName("java.lang.ProcessEnvironment");
|
|
|
+ Field field = cl.getDeclaredField("theEnvironment");
|
|
|
+ field.setAccessible(true);
|
|
|
+ Map<String, String> env = (Map<String, String>) field.get(null);
|
|
|
+ env.clear();
|
|
|
+ env.putAll(newenv);
|
|
|
+ Field ciField = cl.getDeclaredField("theCaseInsensitiveEnvironment");
|
|
|
+ ciField.setAccessible(true);
|
|
|
+ Map<String, String> cienv = (Map<String, String>) ciField.get(null);
|
|
|
+ cienv.clear();
|
|
|
+ cienv.putAll(newenv);
|
|
|
+ } catch (NoSuchFieldException e) {
|
|
|
+ Class[] classes = Collections.class.getDeclaredClasses();
|
|
|
+ Map<String, String> env = System.getenv();
|
|
|
+ for (Class cl : classes) {
|
|
|
+ if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
|
|
|
+ Field field = cl.getDeclaredField("m");
|
|
|
+ field.setAccessible(true);
|
|
|
+ Object obj = field.get(env);
|
|
|
+ Map<String, String> map = (Map<String, String>) obj;
|
|
|
+ map.clear();
|
|
|
+ map.putAll(newenv);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testMRAppMasterCredentials() throws Exception {
|
|
|
+
|
|
|
+ Logger rootLogger = LogManager.getRootLogger();
|
|
|
+ rootLogger.setLevel(Level.DEBUG);
|
|
|
+
|
|
|
+ // Simulate credentials passed to AM via client->RM->NM
|
|
|
+ Credentials credentials = new Credentials();
|
|
|
+ byte[] identifier = "MyIdentifier".getBytes();
|
|
|
+ byte[] password = "MyPassword".getBytes();
|
|
|
+ Text kind = new Text("MyTokenKind");
|
|
|
+ Text service = new Text("host:port");
|
|
|
+ Token<? extends TokenIdentifier> myToken =
|
|
|
+ new Token<TokenIdentifier>(identifier, password, kind, service);
|
|
|
+ Text tokenAlias = new Text("myToken");
|
|
|
+ credentials.addToken(tokenAlias, myToken);
|
|
|
+ Text keyAlias = new Text("mySecretKeyAlias");
|
|
|
+ credentials.addSecretKey(keyAlias, "mySecretKey".getBytes());
|
|
|
+ Token<? extends TokenIdentifier> storedToken =
|
|
|
+ credentials.getToken(tokenAlias);
|
|
|
+
|
|
|
+ JobConf conf = new JobConf();
|
|
|
+
|
|
|
+ Path tokenFilePath = new Path(testDir.getAbsolutePath(), "tokens-file");
|
|
|
+ Map<String, String> newEnv = new HashMap<String, String>();
|
|
|
+ newEnv.put(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION, tokenFilePath
|
|
|
+ .toUri().getPath());
|
|
|
+ setNewEnvironmentHack(newEnv);
|
|
|
+ credentials.writeTokenStorageFile(tokenFilePath, conf);
|
|
|
+
|
|
|
+ ApplicationId appId = BuilderUtils.newApplicationId(12345, 56);
|
|
|
+ ApplicationAttemptId applicationAttemptId =
|
|
|
+ BuilderUtils.newApplicationAttemptId(appId, 1);
|
|
|
+ ContainerId containerId =
|
|
|
+ BuilderUtils.newContainerId(applicationAttemptId, 546);
|
|
|
+ String userName = UserGroupInformation.getCurrentUser().getShortUserName();
|
|
|
+
|
|
|
+ // Create staging dir, so MRAppMaster doesn't barf.
|
|
|
+ File stagingDir =
|
|
|
+ new File(MRApps.getStagingAreaDir(conf, userName).toString());
|
|
|
+ stagingDir.mkdirs();
|
|
|
+
|
|
|
+ // Set login-user to null as that is how real world MRApp starts with.
|
|
|
+ // This is null is the reason why token-file is read by UGI.
|
|
|
+ UserGroupInformation.setLoginUser(null);
|
|
|
+
|
|
|
+ MRAppMasterTest appMaster =
|
|
|
+ new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
|
|
|
+ System.currentTimeMillis(), false, true);
|
|
|
+ MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
|
|
|
+
|
|
|
+ // Now validate the credentials
|
|
|
+ Credentials appMasterCreds = conf.getCredentials();
|
|
|
+ Assert.assertNotNull(appMasterCreds);
|
|
|
+ Assert.assertEquals(1, appMasterCreds.numberOfSecretKeys());
|
|
|
+ Assert.assertEquals(1, appMasterCreds.numberOfTokens());
|
|
|
+
|
|
|
+ // Validate the tokens
|
|
|
+ Token<? extends TokenIdentifier> usedToken =
|
|
|
+ appMasterCreds.getToken(tokenAlias);
|
|
|
+ Assert.assertNotNull(usedToken);
|
|
|
+ Assert.assertEquals(storedToken, usedToken);
|
|
|
+
|
|
|
+ // Validate the keys
|
|
|
+ byte[] usedKey = appMasterCreds.getSecretKey(keyAlias);
|
|
|
+ Assert.assertNotNull(usedKey);
|
|
|
+ Assert.assertEquals("mySecretKey", new String(usedKey));
|
|
|
+
|
|
|
+ // The credentials should also be added to conf so that OuputCommitter can
|
|
|
+ // access it
|
|
|
+ Credentials confCredentials = conf.getCredentials();
|
|
|
+ Assert.assertEquals(1, confCredentials.numberOfSecretKeys());
|
|
|
+ Assert.assertEquals(1, confCredentials.numberOfTokens());
|
|
|
+ Assert.assertEquals(storedToken, confCredentials.getToken(tokenAlias));
|
|
|
+ Assert.assertEquals("mySecretKey",
|
|
|
+ new String(confCredentials.getSecretKey(keyAlias)));
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
class MRAppMasterTest extends MRAppMaster {
|
|
|
|
|
|
Path stagingDirPath;
|
|
|
private Configuration conf;
|
|
|
- private boolean overrideInitAndStart;
|
|
|
+ private boolean overrideInit;
|
|
|
+ private boolean overrideStart;
|
|
|
ContainerAllocator mockContainerAllocator;
|
|
|
+ CommitterEventHandler mockCommitterEventHandler;
|
|
|
+ RMHeartbeatHandler mockRMHeartbeatHandler;
|
|
|
|
|
|
public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
|
|
|
ContainerId containerId, String host, int port, int httpPort,
|
|
|
long submitTime) {
|
|
|
- this(applicationAttemptId, containerId, host, port, httpPort, submitTime,
|
|
|
- true);
|
|
|
+ this(applicationAttemptId, containerId, host, port, httpPort,
|
|
|
+ submitTime, true, true);
|
|
|
}
|
|
|
public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
|
|
|
ContainerId containerId, String host, int port, int httpPort,
|
|
|
- long submitTime, boolean overrideInitAndStart) {
|
|
|
+ long submitTime, boolean overrideInit, boolean overrideStart) {
|
|
|
super(applicationAttemptId, containerId, host, port, httpPort, submitTime);
|
|
|
- this.overrideInitAndStart = overrideInitAndStart;
|
|
|
+ this.overrideInit = overrideInit;
|
|
|
+ this.overrideStart = overrideStart;
|
|
|
mockContainerAllocator = mock(ContainerAllocator.class);
|
|
|
+ mockCommitterEventHandler = mock(CommitterEventHandler.class);
|
|
|
+ mockRMHeartbeatHandler = mock(RMHeartbeatHandler.class);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void init(Configuration conf) {
|
|
|
- if (overrideInitAndStart) {
|
|
|
- this.conf = conf;
|
|
|
- } else {
|
|
|
+ if (!overrideInit) {
|
|
|
super.init(conf);
|
|
|
}
|
|
|
+ this.conf = conf;
|
|
|
}
|
|
|
-
|
|
|
- @Override
|
|
|
+
|
|
|
+ @Override
|
|
|
protected void downloadTokensAndSetupUGI(Configuration conf) {
|
|
|
try {
|
|
|
this.currentUser = UserGroupInformation.getCurrentUser();
|
|
@@ -268,16 +414,27 @@ class MRAppMasterTest extends MRAppMaster {
|
|
|
throw new YarnException(e);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Override
|
|
|
protected ContainerAllocator createContainerAllocator(
|
|
|
final ClientService clientService, final AppContext context) {
|
|
|
return mockContainerAllocator;
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ protected EventHandler<CommitterEvent> createCommitterEventHandler(
|
|
|
+ AppContext context, OutputCommitter committer) {
|
|
|
+ return mockCommitterEventHandler;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected RMHeartbeatHandler getRMHeartbeatHandler() {
|
|
|
+ return mockRMHeartbeatHandler;
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public void start() {
|
|
|
- if (overrideInitAndStart) {
|
|
|
+ if (overrideStart) {
|
|
|
try {
|
|
|
String user = UserGroupInformation.getCurrentUser().getShortUserName();
|
|
|
stagingDirPath = MRApps.getStagingAreaDir(conf, user);
|