|
@@ -21,15 +21,17 @@ package org.apache.hadoop.yarn.server.resourcemanager;
|
|
|
import java.io.IOException;
|
|
|
import java.net.InetSocketAddress;
|
|
|
|
|
|
-import org.apache.avro.ipc.Server;
|
|
|
+import org.apache.hadoop.ipc.Server;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
|
import org.apache.hadoop.security.AccessControlException;
|
|
|
import org.apache.hadoop.security.Groups;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.security.authorize.AccessControlList;
|
|
|
+import org.apache.hadoop.security.authorize.PolicyProvider;
|
|
|
import org.apache.hadoop.security.authorize.ProxyUsers;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
|
@@ -45,11 +47,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.Refresh
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshNodesResponse;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshQueuesRequest;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshQueuesResponse;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshServiceAclsRequest;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshServiceAclsResponse;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
|
|
|
import org.apache.hadoop.yarn.service.AbstractService;
|
|
|
|
|
|
public class AdminService extends AbstractService implements RMAdminProtocol {
|
|
@@ -60,7 +65,11 @@ public class AdminService extends AbstractService implements RMAdminProtocol {
|
|
|
private final ResourceScheduler scheduler;
|
|
|
private final RMContext rmContext;
|
|
|
private final NodesListManager nodesListManager;
|
|
|
-
|
|
|
+
|
|
|
+ private final ClientRMService clientRMService;
|
|
|
+ private final ApplicationMasterService applicationMasterService;
|
|
|
+ private final ResourceTrackerService resourceTrackerService;
|
|
|
+
|
|
|
private Server server;
|
|
|
private InetSocketAddress masterServiceAddress;
|
|
|
private AccessControlList adminAcl;
|
|
@@ -69,12 +78,18 @@ public class AdminService extends AbstractService implements RMAdminProtocol {
|
|
|
RecordFactoryProvider.getRecordFactory(null);
|
|
|
|
|
|
public AdminService(Configuration conf, ResourceScheduler scheduler,
|
|
|
- RMContext rmContext, NodesListManager nodesListManager) {
|
|
|
+ RMContext rmContext, NodesListManager nodesListManager,
|
|
|
+ ClientRMService clientRMService,
|
|
|
+ ApplicationMasterService applicationMasterService,
|
|
|
+ ResourceTrackerService resourceTrackerService) {
|
|
|
super(AdminService.class.getName());
|
|
|
this.conf = conf;
|
|
|
this.scheduler = scheduler;
|
|
|
this.rmContext = rmContext;
|
|
|
this.nodesListManager = nodesListManager;
|
|
|
+ this.clientRMService = clientRMService;
|
|
|
+ this.applicationMasterService = applicationMasterService;
|
|
|
+ this.resourceTrackerService = resourceTrackerService;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -99,6 +114,14 @@ public class AdminService extends AbstractService implements RMAdminProtocol {
|
|
|
conf, null,
|
|
|
conf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT,
|
|
|
YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT));
|
|
|
+
|
|
|
+ // Enable service authorization?
|
|
|
+ if (conf.getBoolean(
|
|
|
+ CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
|
|
|
+ false)) {
|
|
|
+ refreshServiceAcls(conf, new RMPolicyProvider());
|
|
|
+ }
|
|
|
+
|
|
|
this.server.start();
|
|
|
super.start();
|
|
|
}
|
|
@@ -106,7 +129,7 @@ public class AdminService extends AbstractService implements RMAdminProtocol {
|
|
|
@Override
|
|
|
public void stop() {
|
|
|
if (this.server != null) {
|
|
|
- this.server.close();
|
|
|
+ this.server.stop();
|
|
|
}
|
|
|
super.stop();
|
|
|
}
|
|
@@ -222,4 +245,33 @@ public class AdminService extends AbstractService implements RMAdminProtocol {
|
|
|
|
|
|
return recordFactory.newRecordInstance(RefreshAdminAclsResponse.class);
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public RefreshServiceAclsResponse refreshServiceAcls(
|
|
|
+ RefreshServiceAclsRequest request) throws YarnRemoteException {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ if (!conf.getBoolean(
|
|
|
+ CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
|
|
|
+ false)) {
|
|
|
+ throw RPCUtil.getRemoteException(
|
|
|
+ new IOException("Service Authorization (" +
|
|
|
+ CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION +
|
|
|
+ ") not enabled."));
|
|
|
+ }
|
|
|
+
|
|
|
+ PolicyProvider policyProvider = new RMPolicyProvider();
|
|
|
+
|
|
|
+ refreshServiceAcls(conf, policyProvider);
|
|
|
+ clientRMService.refreshServiceAcls(conf, policyProvider);
|
|
|
+ applicationMasterService.refreshServiceAcls(conf, policyProvider);
|
|
|
+ resourceTrackerService.refreshServiceAcls(conf, policyProvider);
|
|
|
+
|
|
|
+ return recordFactory.newRecordInstance(RefreshServiceAclsResponse.class);
|
|
|
+ }
|
|
|
+
|
|
|
+ void refreshServiceAcls(Configuration configuration,
|
|
|
+ PolicyProvider policyProvider) {
|
|
|
+ this.server.refreshServiceAcl(configuration, policyProvider);
|
|
|
+ }
|
|
|
+
|
|
|
}
|