فهرست منبع

HDFS-17601. [ARR] RouterRpcServer supports asynchronous rpc. (#7108). Contributed by hfutatzhanghb.

Reviewed-by: Jian Zhang <keepromise@apache.org>
Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
hfutatzhanghb 5 ماه پیش
والد
کامیت
118061c443

+ 276 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java

@@ -37,6 +37,12 @@ import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_R
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT;
 import static org.apache.hadoop.hdfs.server.federation.router.RouterFederationRename.RouterRenameOption;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncApply;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncCatch;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncComplete;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncForEach;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncReturn;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.asyncTry;
 import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.SCHEDULER_JOURNAL_URI;
 
 import java.io.FileNotFoundException;
@@ -49,6 +55,7 @@ import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EnumSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -69,6 +76,9 @@ import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
 import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil;
+import org.apache.hadoop.hdfs.server.federation.router.async.ApplyFunction;
+import org.apache.hadoop.hdfs.server.federation.router.async.AsyncCatchFunction;
+import org.apache.hadoop.hdfs.server.federation.router.async.CatchFunction;
 import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder;
 import org.apache.hadoop.thirdparty.com.google.common.cache.CacheLoader;
 import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache;
@@ -793,6 +803,46 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
     return invokeOnNs(method, clazz, io, nss);
   }
 
+  /**
+   * Invokes the method at default namespace, if default namespace is not
+   * available then at the other available namespaces.
+   * If the namespace is unavailable, retry with other namespaces.
+   * Asynchronous version of invokeAtAvailableNs method.
+   * @param <T> expected return type.
+   * @param method the remote method.
+   * @return the response received after invoking method.
+   * @throws IOException
+   */
+  <T> T invokeAtAvailableNsAsync(RemoteMethod method, Class<T> clazz)
+      throws IOException {
+    String nsId = subclusterResolver.getDefaultNamespace();
+    // If default Ns is not present return result from first namespace.
+    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    // If no namespace is available, throw IOException.
+    IOException io = new IOException("No namespace available.");
+
+    asyncComplete(null);
+    if (!nsId.isEmpty()) {
+      asyncTry(() -> {
+        getRPCClient().invokeSingle(nsId, method, clazz);
+      });
+
+      asyncCatch((AsyncCatchFunction<T, IOException>)(res, ioe) -> {
+        if (!clientProto.isUnavailableSubclusterException(ioe)) {
+          LOG.debug("{} exception cannot be retried",
+              ioe.getClass().getSimpleName());
+          throw ioe;
+        }
+        nss.removeIf(n -> n.getNameserviceId().equals(nsId));
+        invokeOnNsAsync(method, clazz, io, nss);
+      }, IOException.class);
+    } else {
+      // If not have default NS.
+      invokeOnNsAsync(method, clazz, io, nss);
+    }
+    return asyncReturn(clazz);
+  }
+
   /**
    * Invoke the method sequentially on available namespaces,
    * throw no namespace available exception, if no namespaces are available.
@@ -826,6 +876,61 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
     throw ioe;
   }
 
+  /**
+   * Invoke the method sequentially on available namespaces,
+   * throw no namespace available exception, if no namespaces are available.
+   * Asynchronous version of invokeOnNs method.
+   * @param method the remote method.
+   * @param clazz  Class for the return type.
+   * @param ioe    IOException .
+   * @param nss    List of name spaces in the federation
+   * @return the response received after invoking method.
+   * @throws IOException
+   */
+  <T> T invokeOnNsAsync(RemoteMethod method, Class<T> clazz, IOException ioe,
+      Set<FederationNamespaceInfo> nss) throws IOException {
+    if (nss.isEmpty()) {
+      throw ioe;
+    }
+
+    asyncComplete(null);
+    Iterator<FederationNamespaceInfo> nsIterator = nss.iterator();
+    asyncForEach(nsIterator, (foreach, fnInfo) -> {
+      String nsId = fnInfo.getNameserviceId();
+      LOG.debug("Invoking {} on namespace {}", method, nsId);
+      asyncTry(() -> {
+        getRPCClient().invokeSingle(nsId, method, clazz);
+        asyncApply(result -> {
+          if (result != null) {
+            foreach.breakNow();
+            return result;
+          }
+          return null;
+        });
+      });
+
+      asyncCatch((CatchFunction<T, IOException>)(ret, ex) -> {
+        LOG.debug("Failed to invoke {} on namespace {}", method, nsId, ex);
+        // Ignore the exception and try on other namespace, if the tried
+        // namespace is unavailable, else throw the received exception.
+        if (!clientProto.isUnavailableSubclusterException(ex)) {
+          throw ex;
+        }
+        return null;
+      }, IOException.class);
+    });
+
+    asyncApply(obj -> {
+      if (obj == null) {
+        // Couldn't get a response from any of the namespace, throw ioe.
+        throw ioe;
+      }
+      return obj;
+    });
+
+    return asyncReturn(clazz);
+  }
+
   @Override // ClientProtocol
   public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
       throws IOException {
@@ -877,6 +982,10 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
    */
   RemoteLocation getCreateLocation(final String src) throws IOException {
     final List<RemoteLocation> locations = getLocationsForPath(src, true);
+    if (isAsync()) {
+      getCreateLocationAsync(src, locations);
+      return asyncReturn(RemoteLocation.class);
+    }
     return getCreateLocation(src, locations);
   }
 
@@ -913,6 +1022,44 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
     return createLocation;
   }
 
+  /**
+   * Get the location to create a file. It checks if the file already existed
+   * in one of the locations.
+   * Asynchronous version of getCreateLocation method.
+   *
+   * @param src Path of the file to check.
+   * @param locations Prefetched locations for the file.
+   * @return The remote location for this file.
+   * @throws IOException If the file has no creation location.
+   */
+  RemoteLocation getCreateLocationAsync(
+      final String src, final List<RemoteLocation> locations)
+      throws IOException {
+
+    if (locations == null || locations.isEmpty()) {
+      throw new IOException("Cannot get locations to create " + src);
+    }
+
+    RemoteLocation createLocation = locations.get(0);
+    if (locations.size() > 1) {
+      asyncTry(() -> {
+        getExistingLocationAsync(src, locations);
+        asyncApply((ApplyFunction<RemoteLocation, RemoteLocation>) existingLocation -> {
+          if (existingLocation != null) {
+            LOG.debug("{} already exists in {}.", src, existingLocation);
+            return existingLocation;
+          }
+          return createLocation;
+        });
+      });
+      asyncCatch((o, e) -> createLocation, FileNotFoundException.class);
+    } else {
+      asyncComplete(createLocation);
+    }
+
+    return asyncReturn(RemoteLocation.class);
+  }
+
   /**
    * Gets the remote location where the file exists.
    * @param src the name of file.
@@ -934,6 +1081,31 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
     return null;
   }
 
+  /**
+   * Gets the remote location where the file exists.
+   * Asynchronous version of getExistingLocation method.
+   * @param src the name of file.
+   * @param locations all the remote locations.
+   * @return the remote location of the file if it exists, else null.
+   * @throws IOException in case of any exception.
+   */
+  private RemoteLocation getExistingLocationAsync(String src,
+      List<RemoteLocation> locations) throws IOException {
+    RemoteMethod method = new RemoteMethod("getFileInfo",
+        new Class<?>[] {String.class}, new RemoteParam());
+    getRPCClient().invokeConcurrent(
+        locations, method, true, false, HdfsFileStatus.class);
+    asyncApply((ApplyFunction<Map<RemoteLocation, HdfsFileStatus>, Object>) results -> {
+      for (RemoteLocation loc : locations) {
+        if (results.get(loc) != null) {
+          return loc;
+        }
+      }
+      return null;
+    });
+    return asyncReturn(RemoteLocation.class);
+  }
+
   @Override // ClientProtocol
   public LastBlockWithStatus append(String src, final String clientName,
       final EnumSetWritable<CreateFlag> flag) throws IOException {
@@ -1188,6 +1360,38 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
     return toArray(datanodes, DatanodeInfo.class);
   }
 
+  /**
+   * Get the datanode report with a timeout.
+   * Asynchronous version of the getDatanodeReport method.
+   * @param type Type of the datanode.
+   * @param requireResponse If we require all the namespaces to report.
+   * @param timeOutMs Time out for the reply in milliseconds.
+   * @return List of datanodes.
+   * @throws IOException If it cannot get the report.
+   */
+  public DatanodeInfo[] getDatanodeReportAsync(
+      DatanodeReportType type, boolean requireResponse, long timeOutMs)
+      throws IOException {
+    checkOperation(OperationCategory.UNCHECKED);
+
+    Map<String, DatanodeInfo> datanodesMap = new LinkedHashMap<>();
+    RemoteMethod method = new RemoteMethod("getDatanodeReport",
+        new Class<?>[] {DatanodeReportType.class}, type);
+
+    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    getRPCClient().invokeConcurrent(nss, method, requireResponse, false,
+            timeOutMs, DatanodeInfo[].class);
+
+    asyncApply((ApplyFunction<Map<FederationNamespaceInfo, DatanodeInfo[]>,
+        DatanodeInfo[]>) results -> {
+        updateDnMap(results, datanodesMap);
+        // Map -> Array
+        Collection<DatanodeInfo> datanodes = datanodesMap.values();
+        return toArray(datanodes, DatanodeInfo.class);
+      });
+    return asyncReturn(DatanodeInfo[].class);
+  }
+
   @Override // ClientProtocol
   public DatanodeStorageReport[] getDatanodeStorageReport(
       DatanodeReportType type) throws IOException {
@@ -1206,6 +1410,11 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
     return getDatanodeStorageReportMap(type, true, -1);
   }
 
+  public Map<String, DatanodeStorageReport[]> getDatanodeStorageReportMapAsync(
+      DatanodeReportType type) throws IOException {
+    return getDatanodeStorageReportMapAsync(type, true, -1);
+  }
+
   /**
    * Get the list of datanodes per subcluster.
    *
@@ -1238,6 +1447,42 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
     return ret;
   }
 
+  /**
+   * Get the list of datanodes per subcluster.
+   * Asynchronous version of getDatanodeStorageReportMap method.
+   * @param type Type of the datanodes to get.
+   * @param requireResponse If true an exception will be thrown if all calls do
+   *          not complete. If false exceptions are ignored and all data results
+   *          successfully received are returned.
+   * @param timeOutMs Time out for the reply in milliseconds.
+   * @return nsId to datanode list.
+   * @throws IOException If the method cannot be invoked remotely.
+   */
+  public Map<String, DatanodeStorageReport[]> getDatanodeStorageReportMapAsync(
+      DatanodeReportType type, boolean requireResponse, long timeOutMs)
+      throws IOException {
+
+    Map<String, DatanodeStorageReport[]> ret = new LinkedHashMap<>();
+    RemoteMethod method = new RemoteMethod("getDatanodeStorageReport",
+        new Class<?>[] {DatanodeReportType.class}, type);
+    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    getRPCClient().invokeConcurrent(
+            nss, method, requireResponse, false, timeOutMs, DatanodeStorageReport[].class);
+
+    asyncApply((ApplyFunction<Map<FederationNamespaceInfo, DatanodeStorageReport[]>,
+        Map<String, DatanodeStorageReport[]>>) results -> {
+        for (Entry<FederationNamespaceInfo, DatanodeStorageReport[]> entry :
+            results.entrySet()) {
+          FederationNamespaceInfo ns = entry.getKey();
+          String nsId = ns.getNameserviceId();
+          DatanodeStorageReport[] result = entry.getValue();
+          ret.put(nsId, result);
+        }
+        return ret;
+      });
+    return asyncReturn(ret.getClass());
+  }
+
   @Override // ClientProtocol
   public boolean setSafeMode(SafeModeAction action, boolean isChecked)
       throws IOException {
@@ -2053,6 +2298,37 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
     return toArray(datanodes, DatanodeInfo.class);
   }
 
+  /**
+   * Get the slow running datanodes report with a timeout.
+   * Asynchronous version of the getSlowDatanodeReport method.
+   *
+   * @param requireResponse If we require all the namespaces to report.
+   * @param timeOutMs Time out for the reply in milliseconds.
+   * @return List of datanodes.
+   * @throws IOException If it cannot get the report.
+   */
+  public DatanodeInfo[] getSlowDatanodeReportAsync(boolean requireResponse, long timeOutMs)
+      throws IOException {
+    checkOperation(OperationCategory.UNCHECKED);
+
+    Map<String, DatanodeInfo> datanodesMap = new LinkedHashMap<>();
+    RemoteMethod method = new RemoteMethod("getSlowDatanodeReport");
+
+    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    getRPCClient().invokeConcurrent(nss, method, requireResponse, false,
+            timeOutMs, DatanodeInfo[].class);
+
+    asyncApply((ApplyFunction<Map<FederationNamespaceInfo, DatanodeInfo[]>,
+        DatanodeInfo[]>) results -> {
+        updateDnMap(results, datanodesMap);
+        // Map -> Array
+        Collection<DatanodeInfo> datanodes = datanodesMap.values();
+        return toArray(datanodes, DatanodeInfo.class);
+      });
+
+    return asyncReturn(DatanodeInfo[].class);
+  }
+
   private void updateDnMap(Map<FederationNamespaceInfo, DatanodeInfo[]> results,
       Map<String, DatanodeInfo> datanodesMap) {
     for (Entry<FederationNamespaceInfo, DatanodeInfo[]> entry :

+ 191 - 0
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAsyncRpcServer.java

@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.MockResolver;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.ipc.CallerContext;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
+import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
+import static org.apache.hadoop.hdfs.server.federation.router.async.AsyncUtil.syncReturn;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Used to test the async functionality of {@link RouterRpcServer}.
+ */
+public class TestRouterAsyncRpcServer {
+  private static Configuration routerConf;
+  /** Federated HDFS cluster. */
+  private static MiniRouterDFSCluster cluster;
+  private static String ns0;
+
+  /** Random Router for this federated cluster. */
+  private MiniRouterDFSCluster.RouterContext router;
+  private FileSystem routerFs;
+  private RouterRpcServer asyncRouterRpcServer;
+
+  @BeforeClass
+  public static void setUpCluster() throws Exception {
+    cluster = new MiniRouterDFSCluster(true, 1, 2,
+        DEFAULT_HEARTBEAT_INTERVAL_MS, 1000);
+    cluster.setNumDatanodesPerNameservice(3);
+
+    cluster.startCluster();
+
+    // Making one Namenode active per nameservice
+    if (cluster.isHighAvailability()) {
+      for (String ns : cluster.getNameservices()) {
+        cluster.switchToActive(ns, NAMENODES[0]);
+        cluster.switchToStandby(ns, NAMENODES[1]);
+      }
+    }
+    // Start routers with only an RPC service
+    routerConf = new RouterConfigBuilder()
+        .rpc()
+        .build();
+
+    // Reduce the number of RPC clients threads to overload the Router easy
+    routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1);
+    routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1);
+    routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1);
+    // We decrease the DN cache times to make the test faster
+    routerConf.setTimeDuration(
+        RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
+    cluster.addRouterOverrides(routerConf);
+    // Start routers with only an RPC service
+    cluster.startRouters();
+
+    // Register and verify all NNs with all routers
+    cluster.registerNamenodes();
+    cluster.waitNamenodeRegistration();
+    cluster.waitActiveNamespaces();
+    ns0 = cluster.getNameservices().get(0);
+  }
+
+  @AfterClass
+  public static void shutdownCluster() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    router = cluster.getRandomRouter();
+    routerFs = router.getFileSystem();
+    RouterRpcServer routerRpcServer = router.getRouterRpcServer();
+    routerRpcServer.initAsyncThreadPool();
+    RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient(
+        routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
+        routerRpcServer.getRPCMonitor(),
+        routerRpcServer.getRouterStateIdContext());
+    asyncRouterRpcServer = Mockito.spy(routerRpcServer);
+    Mockito.when(asyncRouterRpcServer.getRPCClient()).thenReturn(asyncRpcClient);
+
+    // Create mock locations
+    MockResolver resolver = (MockResolver) router.getRouter().getSubclusterResolver();
+    resolver.addLocation("/", ns0, "/");
+    FsPermission permission = new FsPermission("705");
+    routerFs.mkdirs(new Path("/testdir"), permission);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    // clear client context
+    CallerContext.setCurrent(null);
+    boolean delete = routerFs.delete(new Path("/testdir"));
+    assertTrue(delete);
+    if (routerFs != null) {
+      routerFs.close();
+    }
+  }
+
+  /**
+   * Test that the async RPC server can invoke a method at an available Namenode.
+   */
+  @Test
+  public void testInvokeAtAvailableNsAsync() throws Exception {
+    RemoteMethod method = new RemoteMethod("getStoragePolicies");
+    asyncRouterRpcServer.invokeAtAvailableNsAsync(method, BlockStoragePolicy[].class);
+    BlockStoragePolicy[] storagePolicies = syncReturn(BlockStoragePolicy[].class);
+    assertEquals(8, storagePolicies.length);
+  }
+
+  /**
+   * Test get create location async.
+   */
+  @Test
+  public void testGetCreateLocationAsync() throws Exception {
+    final List<RemoteLocation> locations =
+        asyncRouterRpcServer.getLocationsForPath("/testdir", true);
+    asyncRouterRpcServer.getCreateLocationAsync("/testdir", locations);
+    RemoteLocation remoteLocation = syncReturn(RemoteLocation.class);
+    assertNotNull(remoteLocation);
+    assertEquals(ns0, remoteLocation.getNameserviceId());
+  }
+
+  /**
+   * Test get datanode report async.
+   */
+  @Test
+  public void testGetDatanodeReportAsync() throws Exception {
+    asyncRouterRpcServer.getDatanodeReportAsync(
+        HdfsConstants.DatanodeReportType.ALL, true, 0);
+    DatanodeInfo[] datanodeInfos = syncReturn(DatanodeInfo[].class);
+    assertEquals(3, datanodeInfos.length);
+
+    // Get the namespace where the datanode is located
+    asyncRouterRpcServer.getDatanodeStorageReportMapAsync(HdfsConstants.DatanodeReportType.ALL);
+    Map<String, DatanodeStorageReport[]> map = syncReturn(Map.class);
+    assertEquals(1, map.size());
+    assertEquals(3, map.get(ns0).length);
+
+    DatanodeInfo[] slowDatanodeReport1 =
+        asyncRouterRpcServer.getSlowDatanodeReport(true, 0);
+
+    asyncRouterRpcServer.getSlowDatanodeReportAsync(true, 0);
+    DatanodeInfo[] slowDatanodeReport2 = syncReturn(DatanodeInfo[].class);
+    assertEquals(slowDatanodeReport1, slowDatanodeReport2);
+  }
+}