|
@@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
|
|
import java.security.PrivilegedExceptionAction;
|
|
import java.security.PrivilegedExceptionAction;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
|
+import java.util.ServiceLoader;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -36,8 +37,8 @@ import org.apache.hadoop.ipc.RPC;
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
import org.apache.hadoop.mapred.JobConf;
|
|
import org.apache.hadoop.mapred.JobConf;
|
|
import org.apache.hadoop.mapred.Master;
|
|
import org.apache.hadoop.mapred.Master;
|
|
-import org.apache.hadoop.mapreduce.ClientFactory.DefaultClientFactory;
|
|
|
|
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
|
|
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
|
|
|
|
+import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
|
|
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
|
|
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
|
|
import org.apache.hadoop.mapreduce.util.ConfigUtil;
|
|
import org.apache.hadoop.mapreduce.util.ConfigUtil;
|
|
import org.apache.hadoop.net.NetUtils;
|
|
import org.apache.hadoop.net.NetUtils;
|
|
@@ -53,6 +54,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
|
@InterfaceStability.Evolving
|
|
@InterfaceStability.Evolving
|
|
public class Cluster {
|
|
public class Cluster {
|
|
private static final Log LOG = LogFactory.getLog(Cluster.class);
|
|
private static final Log LOG = LogFactory.getLog(Cluster.class);
|
|
|
|
+ private ClientProtocolProvider clientProtocolProvider;
|
|
private ClientProtocol client;
|
|
private ClientProtocol client;
|
|
private UserGroupInformation ugi;
|
|
private UserGroupInformation ugi;
|
|
private Configuration conf;
|
|
private Configuration conf;
|
|
@@ -68,15 +70,28 @@ public class Cluster {
|
|
public Cluster(Configuration conf) throws IOException {
|
|
public Cluster(Configuration conf) throws IOException {
|
|
this.conf = conf;
|
|
this.conf = conf;
|
|
this.ugi = UserGroupInformation.getCurrentUser();
|
|
this.ugi = UserGroupInformation.getCurrentUser();
|
|
- client = ClientFactory.create(conf);
|
|
|
|
|
|
+ for (ClientProtocolProvider provider : ServiceLoader.load(ClientProtocolProvider.class)) {
|
|
|
|
+ ClientProtocol clientProtocol = provider.create(conf);
|
|
|
|
+ if (clientProtocol != null) {
|
|
|
|
+ clientProtocolProvider = provider;
|
|
|
|
+ client = clientProtocol;
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)
|
|
public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)
|
|
throws IOException {
|
|
throws IOException {
|
|
this.conf = conf;
|
|
this.conf = conf;
|
|
this.ugi = UserGroupInformation.getCurrentUser();
|
|
this.ugi = UserGroupInformation.getCurrentUser();
|
|
- client = (new DefaultClientFactory()).createJTClient(jobTrackAddr,
|
|
|
|
- conf);
|
|
|
|
|
|
+ for (ClientProtocolProvider provider : ServiceLoader.load(ClientProtocolProvider.class)) {
|
|
|
|
+ ClientProtocol clientProtocol = provider.create(jobTrackAddr, conf);
|
|
|
|
+ if (clientProtocol != null) {
|
|
|
|
+ clientProtocolProvider = provider;
|
|
|
|
+ client = clientProtocol;
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -92,10 +107,7 @@ public class Cluster {
|
|
* Close the <code>Cluster</code>.
|
|
* Close the <code>Cluster</code>.
|
|
*/
|
|
*/
|
|
public synchronized void close() throws IOException {
|
|
public synchronized void close() throws IOException {
|
|
- // FIXME
|
|
|
|
- //if (!(client instanceof LocalJobRunner)) {
|
|
|
|
- RPC.stopProxy(client);
|
|
|
|
- //}
|
|
|
|
|
|
+ clientProtocolProvider.close(client);
|
|
}
|
|
}
|
|
|
|
|
|
private Job[] getJobs(JobStatus[] stats) throws IOException {
|
|
private Job[] getJobs(JobStatus[] stats) throws IOException {
|