|
@@ -18,13 +18,24 @@
|
|
|
|
|
|
package org.apache.hadoop.yarn.server.router.clientrm;
|
|
|
|
|
|
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
|
import java.io.IOException;
|
|
|
+import java.lang.reflect.Method;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Random;
|
|
|
+import java.util.TreeMap;
|
|
|
+import java.util.concurrent.BlockingQueue;
|
|
|
+import java.util.concurrent.Callable;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
|
+import java.util.concurrent.ExecutionException;
|
|
|
+import java.util.concurrent.Future;
|
|
|
+import java.util.concurrent.LinkedBlockingQueue;
|
|
|
+import java.util.concurrent.ThreadFactory;
|
|
|
+import java.util.concurrent.ThreadPoolExecutor;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
import org.apache.commons.lang3.NotImplementedException;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
|
@@ -140,6 +151,7 @@ public class FederationClientInterceptor
|
|
|
private Random rand;
|
|
|
private RouterPolicyFacade policyFacade;
|
|
|
private RouterMetrics routerMetrics;
|
|
|
+ private ThreadPoolExecutor executorService;
|
|
|
private final Clock clock = new MonotonicClock();
|
|
|
|
|
|
@Override
|
|
@@ -149,6 +161,17 @@ public class FederationClientInterceptor
|
|
|
federationFacade = FederationStateStoreFacade.getInstance();
|
|
|
rand = new Random(System.currentTimeMillis());
|
|
|
|
|
|
+
|
|
|
+ int numThreads = getConf().getInt(
|
|
|
+ YarnConfiguration.ROUTER_USER_CLIENT_THREADS_SIZE,
|
|
|
+ YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREADS_SIZE);
|
|
|
+ ThreadFactory threadFactory = new ThreadFactoryBuilder()
|
|
|
+ .setNameFormat("RPC Router Client-" + userName + "-%d ").build();
|
|
|
+
|
|
|
+ BlockingQueue workQueue = new LinkedBlockingQueue<>();
|
|
|
+ this.executorService = new ThreadPoolExecutor(numThreads, numThreads,
|
|
|
+ 0L, TimeUnit.MILLISECONDS, workQueue, threadFactory);
|
|
|
+
|
|
|
final Configuration conf = this.getConf();
|
|
|
|
|
|
try {
|
|
@@ -570,7 +593,72 @@ public class FederationClientInterceptor
|
|
|
@Override
|
|
|
public GetClusterMetricsResponse getClusterMetrics(
|
|
|
GetClusterMetricsRequest request) throws YarnException, IOException {
|
|
|
- throw new NotImplementedException("Code is not implemented");
|
|
|
+ Map<SubClusterId, SubClusterInfo> subclusters =
|
|
|
+ federationFacade.getSubClusters(true);
|
|
|
+ ClientMethod remoteMethod = new ClientMethod("getClusterMetrics",
|
|
|
+ new Class[] {GetClusterMetricsRequest.class}, new Object[] {request});
|
|
|
+ ArrayList<SubClusterId> clusterList = new ArrayList<>(subclusters.keySet());
|
|
|
+ Map<SubClusterId, GetClusterMetricsResponse> clusterMetrics =
|
|
|
+ invokeConcurrent(clusterList, remoteMethod,
|
|
|
+ GetClusterMetricsResponse.class);
|
|
|
+ return RouterYarnClientUtils.merge(clusterMetrics.values());
|
|
|
+ }
|
|
|
+
|
|
|
+ <R> Map<SubClusterId, R> invokeConcurrent(ArrayList<SubClusterId> clusterIds,
|
|
|
+ ClientMethod request, Class<R> clazz) throws YarnException, IOException {
|
|
|
+ List<Callable<Object>> callables = new ArrayList<>();
|
|
|
+ List<Future<Object>> futures = new ArrayList<>();
|
|
|
+ Map<SubClusterId, IOException> exceptions = new TreeMap<>();
|
|
|
+ for (SubClusterId subClusterId : clusterIds) {
|
|
|
+ callables.add(new Callable<Object>() {
|
|
|
+ @Override
|
|
|
+ public Object call() throws Exception {
|
|
|
+ ApplicationClientProtocol protocol =
|
|
|
+ getClientRMProxyForSubCluster(subClusterId);
|
|
|
+ Method method = ApplicationClientProtocol.class
|
|
|
+ .getDeclaredMethod(request.getMethodName(), request.getTypes());
|
|
|
+ return method.invoke(protocol, request.getParams());
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+ Map<SubClusterId, R> results = new TreeMap<>();
|
|
|
+ try {
|
|
|
+ futures.addAll(executorService.invokeAll(callables));
|
|
|
+ for (int i = 0; i < futures.size(); i++) {
|
|
|
+ SubClusterId subClusterId = clusterIds.get(i);
|
|
|
+ try {
|
|
|
+ Future<Object> future = futures.get(i);
|
|
|
+ Object result = future.get();
|
|
|
+ results.put(subClusterId, clazz.cast(result));
|
|
|
+ } catch (ExecutionException ex) {
|
|
|
+ Throwable cause = ex.getCause();
|
|
|
+ LOG.debug("Cannot execute {} on {}: {}", request.getMethodName(),
|
|
|
+ subClusterId.getId(), cause.getMessage());
|
|
|
+ IOException ioe;
|
|
|
+ if (cause instanceof IOException) {
|
|
|
+ ioe = (IOException) cause;
|
|
|
+ } else if (cause instanceof YarnException) {
|
|
|
+ throw (YarnException) cause;
|
|
|
+ } else {
|
|
|
+ ioe = new IOException(
|
|
|
+ "Unhandled exception while calling " + request.getMethodName()
|
|
|
+ + ": " + cause.getMessage(), cause);
|
|
|
+ }
|
|
|
+ // Store the exceptions
|
|
|
+ exceptions.put(subClusterId, ioe);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (results.isEmpty()) {
|
|
|
+ SubClusterId subClusterId = clusterIds.get(0);
|
|
|
+ IOException ioe = exceptions.get(subClusterId);
|
|
|
+ if (ioe != null) {
|
|
|
+ throw ioe;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ throw new YarnException(e);
|
|
|
+ }
|
|
|
+ return results;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -732,4 +820,10 @@ public class FederationClientInterceptor
|
|
|
GetAllResourceTypeInfoRequest request) throws YarnException, IOException {
|
|
|
throw new NotImplementedException("Code is not implemented");
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void shutdown() {
|
|
|
+ executorService.shutdown();
|
|
|
+ super.shutdown();
|
|
|
+ }
|
|
|
}
|