|
@@ -28,12 +28,15 @@ import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
+import org.apache.hadoop.io.DataOutputBuffer;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
|
import org.apache.hadoop.registry.client.api.RegistryConstants;
|
|
|
import org.apache.hadoop.registry.client.api.RegistryOperations;
|
|
|
import org.apache.hadoop.registry.client.api.RegistryOperationsFactory;
|
|
|
import org.apache.hadoop.registry.client.binding.RegistryUtils;
|
|
|
+import org.apache.hadoop.security.Credentials;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
+import org.apache.hadoop.security.token.Token;
|
|
|
import org.apache.hadoop.util.VersionInfo;
|
|
|
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
|
|
@@ -43,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.*;
|
|
|
import org.apache.hadoop.yarn.client.api.AppAdminClient;
|
|
|
import org.apache.hadoop.yarn.client.api.YarnClient;
|
|
|
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
|
|
|
+import org.apache.hadoop.yarn.client.util.YarnClientUtils;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
|
@@ -79,6 +83,9 @@ import org.slf4j.LoggerFactory;
|
|
|
import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
import java.net.InetSocketAddress;
|
|
|
+import java.net.URI;
|
|
|
+import java.net.URISyntaxException;
|
|
|
+import java.nio.ByteBuffer;
|
|
|
import java.text.MessageFormat;
|
|
|
import java.util.*;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
@@ -98,7 +105,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
|
|
|
//TODO disable retry so that client / rest API doesn't block?
|
|
|
protected YarnClient yarnClient;
|
|
|
// Avoid looking up applicationId from fs all the time.
|
|
|
- private Map<String, ApplicationId> cachedAppIds = new ConcurrentHashMap<>();
|
|
|
+ private Map<String, AppInfo> cachedAppInfo = new ConcurrentHashMap<>();
|
|
|
|
|
|
private RegistryOperations registryClient;
|
|
|
private CuratorFramework curatorClient;
|
|
@@ -210,7 +217,8 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
|
|
|
// Write the definition first and then submit - AM will read the definition
|
|
|
createDirAndPersistApp(appDir, service);
|
|
|
ApplicationId appId = submitApp(service);
|
|
|
- cachedAppIds.put(serviceName, appId);
|
|
|
+ cachedAppInfo.put(serviceName, new AppInfo(appId, service
|
|
|
+ .getKerberosPrincipal().getPrincipalName()));
|
|
|
service.setId(appId.toString());
|
|
|
// update app definition with appId
|
|
|
persistAppDef(appDir, service);
|
|
@@ -224,8 +232,9 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
|
|
|
Service persistedService =
|
|
|
ServiceApiUtil.loadService(fs, serviceName);
|
|
|
if (!StringUtils.isEmpty(persistedService.getId())) {
|
|
|
- cachedAppIds.put(persistedService.getName(),
|
|
|
- ApplicationId.fromString(persistedService.getId()));
|
|
|
+ cachedAppInfo.put(persistedService.getName(), new AppInfo(
|
|
|
+ ApplicationId.fromString(persistedService.getId()),
|
|
|
+ persistedService.getKerberosPrincipal().getPrincipalName()));
|
|
|
} else {
|
|
|
throw new YarnException(persistedService.getName()
|
|
|
+ " appId is null, may be not submitted to YARN yet");
|
|
@@ -278,8 +287,9 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
|
|
|
throw new YarnException(
|
|
|
serviceName + " appId is null, may be not submitted to YARN yet");
|
|
|
}
|
|
|
- cachedAppIds.put(persistedService.getName(),
|
|
|
- ApplicationId.fromString(persistedService.getId()));
|
|
|
+ cachedAppInfo.put(persistedService.getName(), new AppInfo(
|
|
|
+ ApplicationId.fromString(persistedService.getId()), persistedService
|
|
|
+ .getKerberosPrincipal().getPrincipalName()));
|
|
|
return flexComponents(serviceName, componentCounts, persistedService);
|
|
|
}
|
|
|
|
|
@@ -328,7 +338,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
|
|
|
throw new YarnException(serviceName + " AM hostname is empty");
|
|
|
}
|
|
|
ClientAMProtocol proxy =
|
|
|
- createAMProxy(appReport.getHost(), appReport.getRpcPort());
|
|
|
+ createAMProxy(serviceName, appReport);
|
|
|
proxy.flexComponents(requestBuilder.build());
|
|
|
for (Map.Entry<String, Long> entry : original.entrySet()) {
|
|
|
LOG.info("[COMPONENT {}]: number of containers changed from {} to {}",
|
|
@@ -366,8 +376,8 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
|
|
|
LOG.info("Stopping service {}, with appId = {}", serviceName, currentAppId);
|
|
|
try {
|
|
|
ClientAMProtocol proxy =
|
|
|
- createAMProxy(report.getHost(), report.getRpcPort());
|
|
|
- cachedAppIds.remove(serviceName);
|
|
|
+ createAMProxy(serviceName, report);
|
|
|
+ cachedAppInfo.remove(serviceName);
|
|
|
if (proxy != null) {
|
|
|
// try to stop the app gracefully.
|
|
|
StopRequestProto request = StopRequestProto.newBuilder().build();
|
|
@@ -406,8 +416,8 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
|
|
|
}
|
|
|
}
|
|
|
} catch (IOException | YarnException | InterruptedException e) {
|
|
|
- LOG.info("Failed to stop " + serviceName
|
|
|
- + " gracefully, forcefully kill the app.");
|
|
|
+ LOG.info("Failed to stop " + serviceName + " gracefully due to: "
|
|
|
+ + e.getMessage() + ", forcefully kill the app.");
|
|
|
yarnClient.killApplication(currentAppId, "Forcefully kill the app");
|
|
|
}
|
|
|
return EXIT_SUCCESS;
|
|
@@ -421,7 +431,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
|
|
|
Path appDir = fs.buildClusterDirPath(serviceName);
|
|
|
FileSystem fileSystem = fs.getFileSystem();
|
|
|
// remove from the appId cache
|
|
|
- cachedAppIds.remove(serviceName);
|
|
|
+ cachedAppInfo.remove(serviceName);
|
|
|
if (fileSystem.exists(appDir)) {
|
|
|
if (fileSystem.delete(appDir, true)) {
|
|
|
LOG.info("Successfully deleted service dir for " + serviceName + ": "
|
|
@@ -552,7 +562,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
|
|
|
// copy jars to hdfs and add to localResources
|
|
|
addJarResource(serviceName, localResources);
|
|
|
// add keytab if in secure env
|
|
|
- addKeytabResourceIfSecure(fs, localResources, conf, serviceName);
|
|
|
+ addKeytabResourceIfSecure(fs, localResources, app);
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
printLocalResources(localResources);
|
|
|
}
|
|
@@ -581,6 +591,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
|
|
|
amLaunchContext.setCommands(Collections.singletonList(cmdStr));
|
|
|
amLaunchContext.setEnvironment(env);
|
|
|
amLaunchContext.setLocalResources(localResources);
|
|
|
+ addHdfsDelegationTokenIfSecure(amLaunchContext);
|
|
|
submissionContext.setAMContainerSpec(amLaunchContext);
|
|
|
yarnClient.submitApplication(submissionContext);
|
|
|
return submissionContext.getApplicationId();
|
|
@@ -771,38 +782,75 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
|
|
|
return appJson;
|
|
|
}
|
|
|
|
|
|
+ private void addHdfsDelegationTokenIfSecure(ContainerLaunchContext amContext)
|
|
|
+ throws IOException {
|
|
|
+ if (!UserGroupInformation.isSecurityEnabled()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ Credentials credentials = new Credentials();
|
|
|
+ String tokenRenewer = YarnClientUtils.getRmPrincipal(getConfig());
|
|
|
+ if (StringUtils.isEmpty(tokenRenewer)) {
|
|
|
+ throw new IOException(
|
|
|
+ "Can't get Master Kerberos principal for the RM to use as renewer");
|
|
|
+ }
|
|
|
+ // Get hdfs dt
|
|
|
+ final org.apache.hadoop.security.token.Token<?>[] tokens =
|
|
|
+ fs.getFileSystem().addDelegationTokens(tokenRenewer, credentials);
|
|
|
+ if (tokens != null && tokens.length != 0) {
|
|
|
+ for (Token<?> token : tokens) {
|
|
|
+ LOG.debug("Got DT: " + token);
|
|
|
+ }
|
|
|
+ DataOutputBuffer dob = new DataOutputBuffer();
|
|
|
+ credentials.writeTokenStorageToStream(dob);
|
|
|
+ ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
|
|
+ amContext.setTokens(fsTokens);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private void addKeytabResourceIfSecure(SliderFileSystem fileSystem,
|
|
|
- Map<String, LocalResource> localResource, Configuration conf,
|
|
|
- String serviceName) throws IOException, BadConfigException {
|
|
|
+ Map<String, LocalResource> localResource, Service service)
|
|
|
+ throws IOException, YarnException {
|
|
|
if (!UserGroupInformation.isSecurityEnabled()) {
|
|
|
return;
|
|
|
}
|
|
|
- String keytabPreInstalledOnHost =
|
|
|
- conf.get(YarnServiceConf.KEY_AM_KEYTAB_LOCAL_PATH);
|
|
|
- if (StringUtils.isEmpty(keytabPreInstalledOnHost)) {
|
|
|
- String amKeytabName =
|
|
|
- conf.get(YarnServiceConf.KEY_AM_LOGIN_KEYTAB_NAME);
|
|
|
- String keytabDir = conf.get(YarnServiceConf.KEY_HDFS_KEYTAB_DIR);
|
|
|
- Path keytabPath =
|
|
|
- fileSystem.buildKeytabPath(keytabDir, amKeytabName, serviceName);
|
|
|
- if (fileSystem.getFileSystem().exists(keytabPath)) {
|
|
|
- LocalResource keytabRes =
|
|
|
- fileSystem.createAmResource(keytabPath, LocalResourceType.FILE);
|
|
|
- localResource
|
|
|
- .put(YarnServiceConstants.KEYTAB_DIR + "/" + amKeytabName, keytabRes);
|
|
|
- LOG.info("Adding AM keytab on hdfs: " + keytabPath);
|
|
|
- } else {
|
|
|
- LOG.warn("No keytab file was found at {}.", keytabPath);
|
|
|
- if (conf.getBoolean(YarnServiceConf.KEY_AM_LOGIN_KEYTAB_REQUIRED, false)) {
|
|
|
- throw new BadConfigException("No keytab file was found at %s.",
|
|
|
- keytabPath);
|
|
|
- } else {
|
|
|
- LOG.warn("The AM will be "
|
|
|
- + "started without a kerberos authenticated identity. "
|
|
|
- + "The service is therefore not guaranteed to remain "
|
|
|
- + "operational beyond 24 hours.");
|
|
|
- }
|
|
|
+ String principalName = service.getKerberosPrincipal().getPrincipalName();
|
|
|
+ if (StringUtils.isEmpty(principalName)) {
|
|
|
+ LOG.warn("No Kerberos principal name specified for " + service.getName());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if(StringUtils.isEmpty(service.getKerberosPrincipal().getKeytab())) {
|
|
|
+ LOG.warn("No Kerberos keytab specified for " + service.getName());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ URI keytabURI;
|
|
|
+ try {
|
|
|
+ keytabURI = new URI(service.getKerberosPrincipal().getKeytab());
|
|
|
+ } catch (URISyntaxException e) {
|
|
|
+ throw new YarnException(e);
|
|
|
+ }
|
|
|
+
|
|
|
+ switch (keytabURI.getScheme()) {
|
|
|
+ case "hdfs":
|
|
|
+ Path keytabOnhdfs = new Path(keytabURI);
|
|
|
+ if (!fileSystem.getFileSystem().exists(keytabOnhdfs)) {
|
|
|
+ LOG.warn(service.getName() + "'s keytab (principalName = " +
|
|
|
+ principalName + ") doesn't exist at: " + keytabOnhdfs);
|
|
|
+ return;
|
|
|
}
|
|
|
+ LocalResource keytabRes =
|
|
|
+ fileSystem.createAmResource(keytabOnhdfs, LocalResourceType.FILE);
|
|
|
+ localResource.put(String.format(YarnServiceConstants.KEYTAB_LOCATION,
|
|
|
+ service.getName()), keytabRes);
|
|
|
+ LOG.debug("Adding " + service.getName() + "'s keytab for " +
|
|
|
+ "localization, uri = " + keytabOnhdfs);
|
|
|
+ break;
|
|
|
+ case "file":
|
|
|
+ LOG.debug("Using a keytab from localhost: " + keytabURI);
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ LOG.warn("Unsupported URI scheme " + keytabURI);
|
|
|
+ break;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -856,7 +904,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
|
|
|
return "";
|
|
|
}
|
|
|
ClientAMProtocol amProxy =
|
|
|
- createAMProxy(appReport.getHost(), appReport.getRpcPort());
|
|
|
+ createAMProxy(appReport.getName(), appReport);
|
|
|
GetStatusResponseProto response =
|
|
|
amProxy.getStatus(GetStatusRequestProto.newBuilder().build());
|
|
|
return response.getStatus();
|
|
@@ -886,7 +934,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
|
|
|
return appSpec;
|
|
|
}
|
|
|
ClientAMProtocol amProxy =
|
|
|
- createAMProxy(appReport.getHost(), appReport.getRpcPort());
|
|
|
+ createAMProxy(serviceName, appReport);
|
|
|
GetStatusResponseProto response =
|
|
|
amProxy.getStatus(GetStatusRequestProto.newBuilder().build());
|
|
|
appSpec = jsonSerDeser.fromJson(response.getStatus());
|
|
@@ -935,18 +983,37 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- protected ClientAMProtocol createAMProxy(String host, int port)
|
|
|
- throws IOException {
|
|
|
+ protected ClientAMProtocol createAMProxy(String serviceName,
|
|
|
+ ApplicationReport appReport) throws IOException, YarnException {
|
|
|
+
|
|
|
+ if (UserGroupInformation.isSecurityEnabled()) {
|
|
|
+ if (!cachedAppInfo.containsKey(serviceName)) {
|
|
|
+ Service persistedService = ServiceApiUtil.loadService(fs, serviceName);
|
|
|
+ cachedAppInfo.put(serviceName, new AppInfo(appReport.getApplicationId(),
|
|
|
+ persistedService.getKerberosPrincipal().getPrincipalName()));
|
|
|
+ }
|
|
|
+ String principalName = cachedAppInfo.get(serviceName).principalName;
|
|
|
+ // Inject the principal into hadoop conf, because Hadoop
|
|
|
+ // SaslRpcClient#getServerPrincipal requires a config for the
|
|
|
+ // principal
|
|
|
+ if (!StringUtils.isEmpty(principalName)) {
|
|
|
+ getConfig().set(PRINCIPAL, principalName);
|
|
|
+ } else {
|
|
|
+ throw new YarnException("No principal specified in the persisted " +
|
|
|
+ "service definition, fail to connect to AM.");
|
|
|
+ }
|
|
|
+ }
|
|
|
InetSocketAddress address =
|
|
|
- NetUtils.createSocketAddrForHost(host, port);
|
|
|
+ NetUtils.createSocketAddrForHost(appReport.getHost(), appReport
|
|
|
+ .getRpcPort());
|
|
|
return ClientAMProxy.createProxy(getConfig(), ClientAMProtocol.class,
|
|
|
UserGroupInformation.getCurrentUser(), rpc, address);
|
|
|
}
|
|
|
|
|
|
public synchronized ApplicationId getAppId(String serviceName)
|
|
|
throws IOException, YarnException {
|
|
|
- if (cachedAppIds.containsKey(serviceName)) {
|
|
|
- return cachedAppIds.get(serviceName);
|
|
|
+ if (cachedAppInfo.containsKey(serviceName)) {
|
|
|
+ return cachedAppInfo.get(serviceName).appId;
|
|
|
}
|
|
|
Service persistedService = ServiceApiUtil.loadService(fs, serviceName);
|
|
|
if (persistedService == null) {
|
|
@@ -954,7 +1021,18 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
|
|
|
+ " doesn't exist on hdfs. Please check if the app exists in RM");
|
|
|
}
|
|
|
ApplicationId currentAppId = ApplicationId.fromString(persistedService.getId());
|
|
|
- cachedAppIds.put(serviceName, currentAppId);
|
|
|
+ cachedAppInfo.put(serviceName, new AppInfo(currentAppId, persistedService
|
|
|
+ .getKerberosPrincipal().getPrincipalName()));
|
|
|
return currentAppId;
|
|
|
}
|
|
|
+
|
|
|
+ private static class AppInfo {
|
|
|
+ ApplicationId appId;
|
|
|
+ String principalName;
|
|
|
+
|
|
|
+ AppInfo(ApplicationId appId, String principalName) {
|
|
|
+ this.appId = appId;
|
|
|
+ this.principalName = principalName;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|