Browse Source

HDFS-15417. RBF: Get the datanode report from cache for federation WebHDFS operations (#2080)

Ye Ni 5 years ago
parent
commit
e820baa6e6

+ 121 - 2
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java

@@ -26,6 +26,8 @@ import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_COUNT_KEY;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_KEY;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_REPORT_CACHE_EXPIRE;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_REPORT_CACHE_EXPIRE_MS_DEFAULT;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -41,7 +43,19 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
@@ -219,6 +233,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
   private static final ThreadLocal<UserGroupInformation> CUR_USER =
       new ThreadLocal<>();
 
+  /** DN type -> full DN report. */
+  private final LoadingCache<DatanodeReportType, DatanodeInfo[]> dnCache;
+
   /**
    * Construct a router RPC server.
    *
@@ -361,6 +378,23 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
     this.nnProto = new RouterNamenodeProtocol(this);
     this.clientProto = new RouterClientProtocol(conf, this);
     this.routerProto = new RouterUserProtocol(this);
+
+    long dnCacheExpire = conf.getTimeDuration(
+        DN_REPORT_CACHE_EXPIRE,
+        DN_REPORT_CACHE_EXPIRE_MS_DEFAULT, TimeUnit.MILLISECONDS);
+    this.dnCache = CacheBuilder.newBuilder()
+        .build(new DatanodeReportCacheLoader());
+
+    // Actively refresh the dn cache in a configured interval
+    Executors
+        .newSingleThreadScheduledExecutor()
+        .scheduleWithFixedDelay(() -> this.dnCache
+                .asMap()
+                .keySet()
+                .parallelStream()
+                .forEach((key) -> this.dnCache.refresh(key)),
+            0,
+            dnCacheExpire, TimeUnit.MILLISECONDS);
   }
 
   @Override
@@ -868,6 +902,50 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
     return clientProto.getDatanodeReport(type);
   }
 
+  /**
+   * Get the datanode report from cache.
+   *
+   * @param type Type of the datanode.
+   * @return List of datanodes.
+   * @throws IOException If it cannot get the report.
+   */
+  DatanodeInfo[] getCachedDatanodeReport(DatanodeReportType type)
+      throws IOException {
+    try {
+      DatanodeInfo[] dns = this.dnCache.get(type);
+      if (dns == null) {
+        LOG.debug("Get null DN report from cache");
+        dns = getCachedDatanodeReportImpl(type);
+        this.dnCache.put(type, dns);
+      }
+      return dns;
+    } catch (ExecutionException e) {
+      LOG.error("Cannot get the DN report for {}", type, e);
+      Throwable cause = e.getCause();
+      if (cause instanceof IOException) {
+        throw (IOException) cause;
+      } else {
+        throw new IOException(cause);
+      }
+    }
+  }
+
+  private DatanodeInfo[] getCachedDatanodeReportImpl(
+      final DatanodeReportType type) throws IOException {
+    // We need to get the DNs as a privileged user
+    UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
+    RouterRpcServer.setCurrentUser(loginUser);
+
+    try {
+      DatanodeInfo[] dns = clientProto.getDatanodeReport(type);
+      LOG.debug("Refresh cached DN report with {} datanodes", dns.length);
+      return dns;
+    } finally {
+      // Reset ugi to remote user for remaining operations.
+      RouterRpcServer.resetCurrentUser();
+    }
+  }
+
   /**
    * Get the datanode report with a timeout.
    * @param type Type of the datanode.
@@ -1748,4 +1826,45 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
   public String[] getGroupsForUser(String user) throws IOException {
     return routerProto.getGroupsForUser(user);
   }
-}
+
+  /**
+   * Deals with loading datanode report into the cache and refresh.
+   */
+  private class DatanodeReportCacheLoader
+      extends CacheLoader<DatanodeReportType, DatanodeInfo[]> {
+
+    private ListeningExecutorService executorService;
+
+    DatanodeReportCacheLoader() {
+      ThreadFactory threadFactory = new ThreadFactoryBuilder()
+          .setNameFormat("DatanodeReport-Cache-Reload")
+          .setDaemon(true)
+          .build();
+
+      executorService = MoreExecutors.listeningDecorator(
+          Executors.newSingleThreadExecutor(threadFactory));
+    }
+
+    @Override
+    public DatanodeInfo[] load(DatanodeReportType type) throws Exception {
+      return getCachedDatanodeReportImpl(type);
+    }
+
+    /**
+     * Override the reload method to provide an asynchronous implementation,
+     * so that the query will not be slowed down by the cache refresh. It
+     * will return the old cache value and schedule a background refresh.
+     */
+    @Override
+    public ListenableFuture<DatanodeInfo[]> reload(
+        final DatanodeReportType type, DatanodeInfo[] oldValue)
+        throws Exception {
+      return executorService.submit(new Callable<DatanodeInfo[]>() {
+        @Override
+        public DatanodeInfo[] call() throws Exception {
+          return load(type);
+        }
+      });
+    }
+  }
+}

+ 1 - 8
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterWebHdfsMethods.java

@@ -454,19 +454,12 @@ public class RouterWebHdfsMethods extends NamenodeWebHdfsMethods {
   private DatanodeInfo chooseDatanode(final Router router,
       final String path, final HttpOpParam.Op op, final long openOffset,
       final String excludeDatanodes) throws IOException {
-    // We need to get the DNs as a privileged user
     final RouterRpcServer rpcServer = getRPCServer(router);
-    UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
-    RouterRpcServer.setCurrentUser(loginUser);
-
     DatanodeInfo[] dns = null;
     try {
-      dns = rpcServer.getDatanodeReport(DatanodeReportType.LIVE);
+      dns = rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE);
     } catch (IOException e) {
       LOG.error("Cannot get the datanodes from the RPC server", e);
-    } finally {
-      // Reset ugi to remote user for remaining operations.
-      RouterRpcServer.resetCurrentUser();
     }
 
     HashSet<Node> excludes = new HashSet<Node>();

+ 69 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java

@@ -67,6 +67,8 @@ import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
@@ -216,6 +218,12 @@ public class TestRouterRpc {
     // Register and verify all NNs with all routers
     cluster.registerNamenodes();
     cluster.waitNamenodeRegistration();
+
+    // We decrease the DN heartbeat expire interval to make them dead faster
+    cluster.getCluster().getNamesystem(0).getBlockManager()
+        .getDatanodeManager().setHeartbeatExpireInterval(5000);
+    cluster.getCluster().getNamesystem(1).getBlockManager()
+        .getDatanodeManager().setHeartbeatExpireInterval(5000);
   }
 
   @AfterClass
@@ -1777,6 +1785,66 @@ public class TestRouterRpc {
     assertArrayEquals(group, result);
   }
 
+  @Test
+  public void testGetCachedDatanodeReport() throws Exception {
+    RouterRpcServer rpcServer = router.getRouter().getRpcServer();
+    final DatanodeInfo[] datanodeReport =
+        rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE);
+
+    // We should have 12 nodes in total
+    assertEquals(12, datanodeReport.length);
+
+    // We should be caching this information
+    DatanodeInfo[] datanodeReport1 =
+        rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE);
+    assertArrayEquals(datanodeReport1, datanodeReport);
+
+    // Stop one datanode
+    MiniDFSCluster miniDFSCluster = getCluster().getCluster();
+    DataNodeProperties dnprop = miniDFSCluster.stopDataNode(0);
+
+    // We wait until the cached value is updated
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        DatanodeInfo[] dn = null;
+        try {
+          dn = rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE);
+        } catch (IOException ex) {
+          LOG.error("Error on getCachedDatanodeReport");
+        }
+        return !Arrays.equals(datanodeReport, dn);
+      }
+    }, 500, 5 * 1000);
+
+    // The cache should be updated now
+    final DatanodeInfo[] datanodeReport2 =
+        rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE);
+    assertEquals(datanodeReport.length - 1, datanodeReport2.length);
+
+    // Restart the DN we just stopped
+    miniDFSCluster.restartDataNode(dnprop);
+    miniDFSCluster.waitActive();
+
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        DatanodeInfo[] dn = null;
+        try {
+          dn = rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE);
+        } catch (IOException ex) {
+          LOG.error("Error on getCachedDatanodeReport");
+        }
+        return datanodeReport.length == dn.length;
+      }
+    }, 500, 5 * 1000);
+
+    // The cache should be updated now
+    final DatanodeInfo[] datanodeReport3 =
+        rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE);
+    assertEquals(datanodeReport.length, datanodeReport3.length);
+  }
+
   /**
    * Check the erasure coding policies in the Router and the Namenode.
    * @return The erasure coding policies.
@@ -1814,4 +1882,4 @@ public class TestRouterRpc {
     }
     return null;
   }
-}
+}