|
@@ -21,7 +21,6 @@ package org.apache.hadoop.mapreduce.v2.app.rm;
|
|
|
import java.io.IOException;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.nio.ByteBuffer;
|
|
|
-import java.security.PrivilegedAction;
|
|
|
import java.util.Map;
|
|
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
@@ -50,12 +49,11 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
+import org.apache.hadoop.yarn.client.ClientRMProxy;
|
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
|
-import org.apache.hadoop.yarn.ipc.YarnRPC;
|
|
|
|
|
|
/**
|
|
|
* Registers/unregisters to RM and sends heartbeats to RM.
|
|
@@ -265,27 +263,12 @@ public abstract class RMCommunicator extends AbstractService
|
|
|
|
|
|
protected ApplicationMasterProtocol createSchedulerProxy() {
|
|
|
final Configuration conf = getConfig();
|
|
|
- final YarnRPC rpc = YarnRPC.create(conf);
|
|
|
- final InetSocketAddress serviceAddr = conf.getSocketAddr(
|
|
|
- YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
|
|
- YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
|
|
|
- YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
|
|
|
|
|
|
- UserGroupInformation currentUser;
|
|
|
try {
|
|
|
- currentUser = UserGroupInformation.getCurrentUser();
|
|
|
+ return ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class);
|
|
|
} catch (IOException e) {
|
|
|
throw new YarnRuntimeException(e);
|
|
|
}
|
|
|
-
|
|
|
- // CurrentUser should already have AMToken loaded.
|
|
|
- return currentUser.doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
|
|
|
- @Override
|
|
|
- public ApplicationMasterProtocol run() {
|
|
|
- return (ApplicationMasterProtocol) rpc.getProxy(ApplicationMasterProtocol.class,
|
|
|
- serviceAddr, conf);
|
|
|
- }
|
|
|
- });
|
|
|
}
|
|
|
|
|
|
protected abstract void heartbeat() throws Exception;
|