Ver código fonte

HDFS-12919. RBF: Support erasure coding methods in RouterRpcServer. Contributed by Inigo Goiri.

(cherry picked from commit b1b10007a43b1a2abb2a23fcb09aa554c55468e2)
Inigo Goiri 7 anos atrás
pai
commit
d63960700d

+ 45 - 12
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java

@@ -666,9 +666,9 @@ public class RouterRpcClient {
    * @throws IOException if the success condition is not met, return the first
    *                     remote exception generated.
    */
-  public Object invokeSequential(
+  public <T> T invokeSequential(
       final List<? extends RemoteLocationContext> locations,
-      final RemoteMethod remoteMethod, Class<?> expectedResultClass,
+      final RemoteMethod remoteMethod, Class<T> expectedResultClass,
       Object expectedResultValue) throws IOException {
 
     final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
@@ -688,7 +688,9 @@ public class RouterRpcClient {
         if (isExpectedClass(expectedResultClass, result) &&
             isExpectedValue(expectedResultValue, result)) {
           // Valid result, stop here
-          return result;
+          @SuppressWarnings("unchecked")
+          T ret = (T)result;
+          return ret;
         }
         if (firstResult == null) {
           firstResult = result;
@@ -718,7 +720,9 @@ public class RouterRpcClient {
       throw firstThrownException;
     }
     // Return the last result, whether it is the value we are looking for or a
-    return firstResult;
+    @SuppressWarnings("unchecked")
+    T ret = (T)firstResult;
+    return ret;
   }
 
   /**
@@ -757,6 +761,28 @@ public class RouterRpcClient {
     }
   }
 
+  /**
+   * Invoke multiple concurrent proxy calls to different clients. Returns an
+   * array of results.
+   *
+   * Re-throws exceptions generated by the remote RPC call as either
+   * RemoteException or IOException.
+   *
+   * @param <T> The type of the remote location.
+   * @param locations List of remote locations to call concurrently.
+   * @param method The remote method and parameters to invoke.
+   * @param requireResponse If true an exception will be thrown if all calls do
+   *          not complete. If false exceptions are ignored and all data results
+   *          successfully received are returned.
+   * @param standby If the requests should go to the standby namenodes too.
+   * @throws IOException If all the calls throw an exception.
+   */
+  public <T extends RemoteLocationContext, R> void invokeConcurrent(
+      final Collection<T> locations, final RemoteMethod method,
+      boolean requireResponse, boolean standby) throws IOException {
+    invokeConcurrent(locations, method, requireResponse, standby, void.class);
+  }
+
   /**
    * Invokes multiple concurrent proxy calls to different clients. Returns an
    * array of results.
@@ -765,20 +791,24 @@ public class RouterRpcClient {
    * RemoteException or IOException.
    *
    * @param <T> The type of the remote location.
+   * @param <R> The type of the remote method return.
    * @param locations List of remote locations to call concurrently.
    * @param method The remote method and parameters to invoke.
    * @param requireResponse If true an exception will be thrown if all calls do
    *          not complete. If false exceptions are ignored and all data results
    *          successfully received are returned.
    * @param standby If the requests should go to the standby namenodes too.
+   * @param clazz Type of the remote return type.
    * @return Result of invoking the method per subcluster: nsId -> result.
    * @throws IOException If requiredResponse=true and any of the calls throw an
    *           exception.
    */
-  public <T extends RemoteLocationContext> Map<T, Object> invokeConcurrent(
+  public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
       final Collection<T> locations, final RemoteMethod method,
-      boolean requireResponse, boolean standby) throws IOException {
-    return invokeConcurrent(locations, method, requireResponse, standby, -1);
+      boolean requireResponse, boolean standby, Class<R> clazz)
+          throws IOException {
+    return invokeConcurrent(
+        locations, method, requireResponse, standby, -1, clazz);
   }
 
   /**
@@ -788,6 +818,8 @@ public class RouterRpcClient {
    * Re-throws exceptions generated by the remote RPC call as either
    * RemoteException or IOException.
    *
+   * @param <T> The type of the remote location.
+    * @param <R> The type of the remote method return.
    * @param locations List of remote locations to call concurrently.
    * @param method The remote method and parameters to invoke.
    * @param requireResponse If true an exception will be thrown if all calls do
@@ -795,14 +827,15 @@ public class RouterRpcClient {
    *          successfully received are returned.
    * @param standby If the requests should go to the standby namenodes too.
    * @param timeOutMs Timeout for each individual call.
+   * @param clazz Type of the remote return type.
    * @return Result of invoking the method per subcluster: nsId -> result.
    * @throws IOException If requiredResponse=true and any of the calls throw an
    *           exception.
    */
   @SuppressWarnings("unchecked")
-  public <T extends RemoteLocationContext> Map<T, Object> invokeConcurrent(
+  public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
       final Collection<T> locations, final RemoteMethod method,
-      boolean requireResponse, boolean standby, long timeOutMs)
+      boolean requireResponse, boolean standby, long timeOutMs, Class<R> clazz)
           throws IOException {
 
     final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
@@ -816,7 +849,7 @@ public class RouterRpcClient {
           getNamenodesForNameservice(ns);
       Object[] paramList = method.getParams(location);
       Object result = invokeMethod(ugi, namenodes, m, paramList);
-      return Collections.singletonMap(location, result);
+      return Collections.singletonMap(location, clazz.cast(result));
     }
 
     List<T> orderedLocations = new LinkedList<>();
@@ -866,14 +899,14 @@ public class RouterRpcClient {
       } else {
         futures = executorService.invokeAll(callables);
       }
-      Map<T, Object> results = new TreeMap<>();
+      Map<T, R> results = new TreeMap<>();
       Map<T, IOException> exceptions = new TreeMap<>();
       for (int i=0; i<futures.size(); i++) {
         T location = orderedLocations.get(i);
         try {
           Future<Object> future = futures.get(i);
           Object result = future.get();
-          results.put(location, result);
+          results.put(location, clazz.cast(result));
         } catch (CancellationException ce) {
           T loc = orderedLocations.get(i);
           String msg =

+ 72 - 55
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java

@@ -28,12 +28,14 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.lang.reflect.Array;
 import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -961,8 +963,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
     RemoteMethod method = new RemoteMethod("getListing",
         new Class<?>[] {String.class, startAfter.getClass(), boolean.class},
         new RemoteParam(), startAfter, needLocation);
-    Map<RemoteLocation, Object> listings =
-        rpcClient.invokeConcurrent(locations, method, false, false);
+    Map<RemoteLocation, DirectoryListing> listings =
+        rpcClient.invokeConcurrent(
+            locations, method, false, false, DirectoryListing.class);
 
     Map<String, HdfsFileStatus> nnListing = new TreeMap<>();
     int totalRemainingEntries = 0;
@@ -971,9 +974,10 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
     if (listings != null) {
       // Check the subcluster listing with the smallest name
       String lastName = null;
-      for (Entry<RemoteLocation, Object> entry : listings.entrySet()) {
+      for (Entry<RemoteLocation, DirectoryListing> entry :
+          listings.entrySet()) {
         RemoteLocation location = entry.getKey();
-        DirectoryListing listing = (DirectoryListing) entry.getValue();
+        DirectoryListing listing = entry.getValue();
         if (listing == null) {
           LOG.debug("Cannot get listing from {}", location);
         } else {
@@ -1097,11 +1101,10 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
 
     RemoteMethod method = new RemoteMethod("getStats");
     Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
-    Map<FederationNamespaceInfo, Object> results =
-        rpcClient.invokeConcurrent(nss, method, true, false);
+    Map<FederationNamespaceInfo, long[]> results =
+        rpcClient.invokeConcurrent(nss, method, true, false, long[].class);
     long[] combinedData = new long[STATS_ARRAY_LENGTH];
-    for (Object o : results.values()) {
-      long[] data = (long[]) o;
+    for (long[] data : results.values()) {
       for (int i = 0; i < combinedData.length && i < data.length; i++) {
         if (data[i] >= 0) {
           combinedData[i] += data[i];
@@ -1134,11 +1137,13 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
         new Class<?>[] {DatanodeReportType.class}, type);
 
     Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
-    Map<FederationNamespaceInfo, Object> results =
-        rpcClient.invokeConcurrent(nss, method, true, false, timeOutMs);
-    for (Entry<FederationNamespaceInfo, Object> entry : results.entrySet()) {
+    Map<FederationNamespaceInfo, DatanodeInfo[]> results =
+        rpcClient.invokeConcurrent(
+            nss, method, true, false, timeOutMs, DatanodeInfo[].class);
+    for (Entry<FederationNamespaceInfo, DatanodeInfo[]> entry :
+        results.entrySet()) {
       FederationNamespaceInfo ns = entry.getKey();
-      DatanodeInfo[] result = (DatanodeInfo[]) entry.getValue();
+      DatanodeInfo[] result = entry.getValue();
       for (DatanodeInfo node : result) {
         String nodeId = node.getXferAddr();
         if (!datanodesMap.containsKey(nodeId)) {
@@ -1224,17 +1229,14 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
         new Class<?>[] {SafeModeAction.class, boolean.class},
         action, isChecked);
     Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
-    Map<FederationNamespaceInfo, Object> results =
-        rpcClient.invokeConcurrent(nss, method, true, true);
+    Map<FederationNamespaceInfo, Boolean> results =
+        rpcClient.invokeConcurrent(nss, method, true, true, boolean.class);
 
     // We only report true if all the name space are in safe mode
     int numSafemode = 0;
-    for (Object result : results.values()) {
-      if (result instanceof Boolean) {
-        boolean safemode = (boolean) result;
-        if (safemode) {
-          numSafemode++;
-        }
+    for (boolean safemode : results.values()) {
+      if (safemode) {
+        numSafemode++;
       }
     }
     return numSafemode == results.size();
@@ -1247,18 +1249,14 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
     RemoteMethod method = new RemoteMethod("restoreFailedStorage",
         new Class<?>[] {String.class}, arg);
     final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
-    Map<FederationNamespaceInfo, Object> ret =
-        rpcClient.invokeConcurrent(nss, method, true, false);
+    Map<FederationNamespaceInfo, Boolean> ret =
+        rpcClient.invokeConcurrent(nss, method, true, false, boolean.class);
 
     boolean success = true;
-    Object obj = ret;
-    @SuppressWarnings("unchecked")
-    Map<FederationNamespaceInfo, Boolean> results =
-        (Map<FederationNamespaceInfo, Boolean>)obj;
-    Collection<Boolean> sucesses = results.values();
-    for (boolean s : sucesses) {
+    for (boolean s : ret.values()) {
       if (!s) {
         success = false;
+        break;
       }
     }
     return success;
@@ -1279,17 +1277,12 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
 
     RemoteMethod method = new RemoteMethod("rollEdits", new Class<?>[] {});
     final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
-    Map<FederationNamespaceInfo, Object> ret =
-        rpcClient.invokeConcurrent(nss, method, true, false);
+    Map<FederationNamespaceInfo, Long> ret =
+        rpcClient.invokeConcurrent(nss, method, true, false, long.class);
 
     // Return the maximum txid
     long txid = 0;
-    Object obj = ret;
-    @SuppressWarnings("unchecked")
-    Map<FederationNamespaceInfo, Long> results =
-        (Map<FederationNamespaceInfo, Long>)obj;
-    Collection<Long> txids = results.values();
-    for (long t : txids) {
+    for (long t : ret.values()) {
       if (t > txid) {
         txid = t;
       }
@@ -1324,17 +1317,13 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
     RemoteMethod method = new RemoteMethod("rollingUpgrade",
         new Class<?>[] {RollingUpgradeAction.class}, action);
     final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
-    Map<FederationNamespaceInfo, Object> ret =
-        rpcClient.invokeConcurrent(nss, method, true, false);
+    Map<FederationNamespaceInfo, RollingUpgradeInfo> ret =
+        rpcClient.invokeConcurrent(
+            nss, method, true, false, RollingUpgradeInfo.class);
 
     // Return the first rolling upgrade info
     RollingUpgradeInfo info = null;
-    Object obj = ret;
-    @SuppressWarnings("unchecked")
-    Map<FederationNamespaceInfo, RollingUpgradeInfo> results =
-        (Map<FederationNamespaceInfo, RollingUpgradeInfo>)obj;
-    Collection<RollingUpgradeInfo> infos = results.values();
-    for (RollingUpgradeInfo infoNs : infos) {
+    for (RollingUpgradeInfo infoNs : ret.values()) {
       if (info == null && infoNs != null) {
         info = infoNs;
       }
@@ -1386,10 +1375,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
       final List<RemoteLocation> locations = getLocationsForPath(path, false);
       RemoteMethod method = new RemoteMethod("getContentSummary",
           new Class<?>[] {String.class}, new RemoteParam());
-      @SuppressWarnings("unchecked")
-      Map<String, ContentSummary> results =
-          (Map<String, ContentSummary>) ((Object)rpcClient.invokeConcurrent(
-              locations, method, false, false));
+      Map<RemoteLocation, ContentSummary> results =
+          rpcClient.invokeConcurrent(
+              locations, method, false, false, ContentSummary.class);
       summaries.addAll(results.values());
     } catch (FileNotFoundException e) {
       notFoundException = e;
@@ -1762,16 +1750,12 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
     RemoteMethod method = new RemoteMethod(
         "getCurrentEditLogTxid", new Class<?>[] {});
     final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
-    Map<FederationNamespaceInfo, Object> ret =
-        rpcClient.invokeConcurrent(nss, method, true, false);
+    Map<FederationNamespaceInfo, Long> ret =
+        rpcClient.invokeConcurrent(nss, method, true, false, long.class);
 
     // Return the maximum txid
     long txid = 0;
-    Object obj = ret;
-    @SuppressWarnings("unchecked")
-    Map<FederationNamespaceInfo, Long> results =
-        (Map<FederationNamespaceInfo, Long>)obj;
-    Collection<Long> txids = results.values();
+    Collection<Long> txids = ret.values();
     for (long t : txids) {
       if (t > txid) {
         txid = t;
@@ -2032,6 +2016,39 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
     return (ugi != null) ? ugi : UserGroupInformation.getCurrentUser();
   }
 
+  /**
+   * Merge the outputs from multiple namespaces.
+   * @param map Namespace -> Output array.
+   * @param clazz Class of the values.
+   * @return Array with the outputs.
+   */
+  protected static <T> T[] merge(
+      Map<FederationNamespaceInfo, T[]> map, Class<T> clazz) {
+
+    // Put all results into a set to avoid repeats
+    Set<T> ret = new LinkedHashSet<>();
+    for (T[] values : map.values()) {
+      for (T val : values) {
+        ret.add(val);
+      }
+    }
+
+    return toArray(ret, clazz);
+  }
+
+  /**
+   * Convert a set of values into an array.
+   * @param set Input set.
+   * @param clazz Class of the values.
+   * @return Array with the values in set.
+   */
+  private static <T> T[] toArray(Set<T> set, Class<T> clazz) {
+    @SuppressWarnings("unchecked")
+    T[] combinedData = (T[]) Array.newInstance(clazz, set.size());
+    combinedData = set.toArray(combinedData);
+    return combinedData;
+  }
+
   /**
    * Get RPC metrics info.
    * @return The instance of FederationRPCMetrics.

+ 9 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java

@@ -109,6 +109,8 @@ public class RouterDFSCluster {
   private List<RouterContext> routers;
   /** If the Namenodes are in high availability.*/
   private boolean highAvailability;
+  /** Number of datanodes per nameservice. */
+  private int numDatanodesPerNameservice = 2;
 
   /** Mini cluster. */
   private MiniDFSCluster cluster;
@@ -356,8 +358,8 @@ public class RouterDFSCluster {
         DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS);
   }
 
-  public RouterDFSCluster(boolean ha, int numNameservices, int numNamnodes) {
-    this(ha, numNameservices, numNamnodes,
+  public RouterDFSCluster(boolean ha, int numNameservices, int numNamenodes) {
+    this(ha, numNameservices, numNamenodes,
         DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_CACHE_INTERVAL_MS);
   }
 
@@ -531,6 +533,10 @@ public class RouterDFSCluster {
     }
   }
 
+  public void setNumDatanodesPerNameservice(int num) {
+    this.numDatanodesPerNameservice = num;
+  }
+
   public String getNameservicesKey() {
     StringBuilder sb = new StringBuilder();
     for (String nsId : this.nameservices) {
@@ -658,7 +664,7 @@ public class RouterDFSCluster {
         nnConf.addResource(overrideConf);
       }
       cluster = new MiniDFSCluster.Builder(nnConf)
-          .numDataNodes(nameservices.size()*2)
+          .numDataNodes(nameservices.size() * numDatanodesPerNameservice)
           .nnTopology(topology)
           .build();
       cluster.waitActive();

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java

@@ -153,9 +153,9 @@ public class TestRouterRpc {
     // Wait to ensure NN has fully created its test directories
     Thread.sleep(100);
 
-    // Pick a NS, namenode and router for this test
+    // Default namenode and random router for this test
     this.router = cluster.getRandomRouter();
-    this.ns = cluster.getRandomNameservice();
+    this.ns = cluster.getNameservices().get(0);
     this.namenode = cluster.getNamenode(ns, null);
 
     // Handles to the ClientProtocol interface