|
@@ -18,6 +18,7 @@
|
|
|
|
|
|
package org.apache.hadoop.hdfs.server.federation.router;
|
|
|
|
|
|
+import java.io.FileNotFoundException;
|
|
|
import java.io.IOException;
|
|
|
import java.lang.reflect.Constructor;
|
|
|
import java.lang.reflect.InvocationTargetException;
|
|
@@ -62,6 +63,7 @@ import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
|
|
|
|
/**
|
|
@@ -611,7 +613,7 @@ public class RouterRpcClient {
|
|
|
UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
|
|
|
List<? extends FederationNamenodeContext> nns =
|
|
|
getNamenodesForNameservice(nsId);
|
|
|
- RemoteLocationContext loc = new RemoteLocation(nsId, "/");
|
|
|
+ RemoteLocationContext loc = new RemoteLocation(nsId, "/", "/");
|
|
|
Class<?> proto = method.getProtocol();
|
|
|
Method m = method.getMethod();
|
|
|
Object[] params = method.getParams(loc);
|
|
@@ -727,8 +729,12 @@ public class RouterRpcClient {
|
|
|
firstResult = result;
|
|
|
}
|
|
|
} catch (IOException ioe) {
|
|
|
+ // Localize the exception
|
|
|
+
|
|
|
+ ioe = processException(ioe, loc);
|
|
|
+
|
|
|
// Record it and move on
|
|
|
- lastThrownException = (IOException) ioe;
|
|
|
+ lastThrownException = ioe;
|
|
|
if (firstThrownException == null) {
|
|
|
firstThrownException = lastThrownException;
|
|
|
}
|
|
@@ -756,6 +762,63 @@ public class RouterRpcClient {
|
|
|
return ret;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Exception messages might contain local subcluster paths. This method
|
|
|
+ * generates a new exception with the proper message.
|
|
|
+ * @param ioe Original IOException.
|
|
|
+ * @param loc Location we are processing.
|
|
|
+ * @return Exception processed for federation.
|
|
|
+ */
|
|
|
+ private IOException processException(
|
|
|
+ IOException ioe, RemoteLocationContext loc) {
|
|
|
+
|
|
|
+ if (ioe instanceof RemoteException) {
|
|
|
+ RemoteException re = (RemoteException)ioe;
|
|
|
+ String newMsg = processExceptionMsg(
|
|
|
+ re.getMessage(), loc.getDest(), loc.getSrc());
|
|
|
+ RemoteException newException =
|
|
|
+ new RemoteException(re.getClassName(), newMsg);
|
|
|
+ newException.setStackTrace(ioe.getStackTrace());
|
|
|
+ return newException;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (ioe instanceof FileNotFoundException) {
|
|
|
+ String newMsg = processExceptionMsg(
|
|
|
+ ioe.getMessage(), loc.getDest(), loc.getSrc());
|
|
|
+ FileNotFoundException newException = new FileNotFoundException(newMsg);
|
|
|
+ newException.setStackTrace(ioe.getStackTrace());
|
|
|
+ return newException;
|
|
|
+ }
|
|
|
+
|
|
|
+ return ioe;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Process a subcluster message and make it federated.
|
|
|
+ * @param msg Original exception message.
|
|
|
+ * @param dst Path in federation.
|
|
|
+ * @param src Path in the subcluster.
|
|
|
+ * @return Message processed for federation.
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
+ static String processExceptionMsg(
|
|
|
+ final String msg, final String dst, final String src) {
|
|
|
+ if (dst.equals(src) || !dst.startsWith("/") || !src.startsWith("/")) {
|
|
|
+ return msg;
|
|
|
+ }
|
|
|
+
|
|
|
+ String newMsg = msg.replaceFirst(dst, src);
|
|
|
+ int minLen = Math.min(dst.length(), src.length());
|
|
|
+ for (int i = 0; newMsg.equals(msg) && i < minLen; i++) {
|
|
|
+ // Check if we can replace sub folders
|
|
|
+ String dst1 = dst.substring(0, dst.length() - 1 - i);
|
|
|
+ String src1 = src.substring(0, src.length() - 1 - i);
|
|
|
+ newMsg = msg.replaceFirst(dst1, src1);
|
|
|
+ }
|
|
|
+
|
|
|
+ return newMsg;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Checks if a result matches the required result class.
|
|
|
*
|