|
@@ -24,13 +24,13 @@ import org.apache.hadoop.io.Text;
|
|
|
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
|
import java.io.IOException;
|
|
|
import java.lang.reflect.Method;
|
|
|
+import java.lang.reflect.InvocationTargetException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Random;
|
|
|
import java.util.TreeMap;
|
|
|
-import java.util.Set;
|
|
|
import java.util.concurrent.BlockingQueue;
|
|
|
import java.util.concurrent.Callable;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
@@ -842,13 +842,27 @@ public class FederationClientInterceptor
|
|
|
// Generate parallel Callable tasks
|
|
|
for (SubClusterId subClusterId : subClusterIds) {
|
|
|
callables.add(() -> {
|
|
|
- ApplicationClientProtocol protocol = getClientRMProxyForSubCluster(subClusterId);
|
|
|
- String methodName = request.getMethodName();
|
|
|
- Class<?>[] types = request.getTypes();
|
|
|
- Object[] params = request.getParams();
|
|
|
- Method method = ApplicationClientProtocol.class.getMethod(methodName, types);
|
|
|
- Object result = method.invoke(protocol, params);
|
|
|
- return Pair.of(subClusterId, result);
|
|
|
+ try {
|
|
|
+ ApplicationClientProtocol protocol = getClientRMProxyForSubCluster(subClusterId);
|
|
|
+ String methodName = request.getMethodName();
|
|
|
+ Class<?>[] types = request.getTypes();
|
|
|
+ Object[] params = request.getParams();
|
|
|
+ Method method = ApplicationClientProtocol.class.getMethod(methodName, types);
|
|
|
+ Object result = method.invoke(protocol, params);
|
|
|
+ return Pair.of(subClusterId, result);
|
|
|
+ } catch (Exception e) {
|
|
|
+ Throwable cause = e.getCause();
|
|
|
+ // We use Callable. If the exception thrown here is InvocationTargetException,
|
|
|
+ // it is a wrapped exception. We need to get the real cause of the error.
|
|
|
+ if (cause != null && cause instanceof InvocationTargetException) {
|
|
|
+ cause = cause.getCause();
|
|
|
+ }
|
|
|
+ String errMsg = (cause.getMessage() != null) ? cause.getMessage() : "UNKNOWN";
|
|
|
+ YarnException yarnException =
|
|
|
+ new YarnException(String.format("subClusterId %s exec %s error %s.",
|
|
|
+ subClusterId, request.getMethodName(), errMsg), e);
|
|
|
+ return Pair.of(subClusterId, yarnException);
|
|
|
+ }
|
|
|
});
|
|
|
}
|
|
|
|
|
@@ -862,8 +876,11 @@ public class FederationClientInterceptor
|
|
|
Pair<SubClusterId, Object> pair = future.get();
|
|
|
subClusterId = pair.getKey();
|
|
|
Object result = pair.getValue();
|
|
|
+ if (result instanceof YarnException) {
|
|
|
+ throw YarnException.class.cast(result);
|
|
|
+ }
|
|
|
results.put(subClusterId, clazz.cast(result));
|
|
|
- } catch (InterruptedException | ExecutionException e) {
|
|
|
+ } catch (InterruptedException | ExecutionException | YarnException e) {
|
|
|
Throwable cause = e.getCause();
|
|
|
LOG.error("Cannot execute {} on {} : {}", request.getMethodName(),
|
|
|
subClusterId.getId(), cause.getMessage());
|
|
@@ -877,9 +894,8 @@ public class FederationClientInterceptor
|
|
|
// All sub-clusters return results to be considered successful,
|
|
|
// otherwise an exception will be thrown.
|
|
|
if (exceptions != null && !exceptions.isEmpty()) {
|
|
|
- Set<SubClusterId> subClusterIdSets = exceptions.keySet();
|
|
|
- throw new YarnException("invokeConcurrent Failed, An exception occurred in subClusterIds = " +
|
|
|
- StringUtils.join(subClusterIdSets, ","));
|
|
|
+ throw new YarnException("invokeConcurrent Failed = " +
|
|
|
+ StringUtils.join(exceptions.values(), ","));
|
|
|
}
|
|
|
|
|
|
// return result
|