Browse Source

HDFS-13410. RBF: Support federation with no subclusters. Contributed by Inigo Goiri.

(cherry picked from commit a92200f4a6cec57b7080d1cd6e2a20d79d772dd6)
Yiqun Lin 7 years ago
parent
commit
0a216c3285

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java

@@ -932,7 +932,9 @@ public class RouterRpcClient {
     final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
     final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
     final Method m = method.getMethod();
     final Method m = method.getMethod();
 
 
-    if (locations.size() == 1) {
+    if (locations.isEmpty()) {
+      throw new IOException("No remote locations available");
+    } else if (locations.size() == 1) {
       // Shortcut, just one call
       // Shortcut, just one call
       T location = locations.iterator().next();
       T location = locations.iterator().next();
       String ns = location.getNameserviceId();
       String ns = location.getNameserviceId();

+ 54 - 16
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java

@@ -17,23 +17,25 @@
  */
  */
 package org.apache.hadoop.hdfs.server.federation.router;
 package org.apache.hadoop.hdfs.server.federation.router;
 
 
+import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
 
 
 import java.io.IOException;
 import java.io.IOException;
-import java.net.URISyntaxException;
+import java.net.InetSocketAddress;
 
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.server.federation.MockResolver;
 import org.apache.hadoop.hdfs.server.federation.MockResolver;
 import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
 import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
 import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.service.Service.STATE;
 import org.apache.hadoop.service.Service.STATE;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.Test;
 
 
@@ -77,27 +79,31 @@ public class TestRouter {
             "0.0.0.0");
             "0.0.0.0");
   }
   }
 
 
-  @AfterClass
-  public static void destroy() {
-  }
-
-  @Before
-  public void setup() throws IOException, URISyntaxException {
-  }
-
-  @After
-  public void cleanup() {
-  }
-
   private static void testRouterStartup(Configuration routerConfig)
   private static void testRouterStartup(Configuration routerConfig)
       throws InterruptedException, IOException {
       throws InterruptedException, IOException {
     Router router = new Router();
     Router router = new Router();
     assertEquals(STATE.NOTINITED, router.getServiceState());
     assertEquals(STATE.NOTINITED, router.getServiceState());
+    assertEquals(RouterServiceState.UNINITIALIZED, router.getRouterState());
     router.init(routerConfig);
     router.init(routerConfig);
+    if (routerConfig.getBoolean(
+        RBFConfigKeys.DFS_ROUTER_SAFEMODE_ENABLE,
+        RBFConfigKeys.DFS_ROUTER_SAFEMODE_ENABLE_DEFAULT)) {
+      assertEquals(RouterServiceState.SAFEMODE, router.getRouterState());
+    } else {
+      assertEquals(RouterServiceState.INITIALIZING, router.getRouterState());
+    }
     assertEquals(STATE.INITED, router.getServiceState());
     assertEquals(STATE.INITED, router.getServiceState());
     router.start();
     router.start();
+    if (routerConfig.getBoolean(
+        RBFConfigKeys.DFS_ROUTER_SAFEMODE_ENABLE,
+        RBFConfigKeys.DFS_ROUTER_SAFEMODE_ENABLE_DEFAULT)) {
+      assertEquals(RouterServiceState.SAFEMODE, router.getRouterState());
+    } else {
+      assertEquals(RouterServiceState.RUNNING, router.getRouterState());
+    }
     assertEquals(STATE.STARTED, router.getServiceState());
     assertEquals(STATE.STARTED, router.getServiceState());
     router.stop();
     router.stop();
+    assertEquals(RouterServiceState.SHUTDOWN, router.getRouterState());
     assertEquals(STATE.STOPPED, router.getServiceState());
     assertEquals(STATE.STOPPED, router.getServiceState());
     router.close();
     router.close();
   }
   }
@@ -114,6 +120,9 @@ public class TestRouter {
     // Rpc only
     // Rpc only
     testRouterStartup(new RouterConfigBuilder(conf).rpc().build());
     testRouterStartup(new RouterConfigBuilder(conf).rpc().build());
 
 
+    // Safemode only
+    testRouterStartup(new RouterConfigBuilder(conf).rpc().safemode().build());
+
     // Metrics only
     // Metrics only
     testRouterStartup(new RouterConfigBuilder(conf).metrics().build());
     testRouterStartup(new RouterConfigBuilder(conf).metrics().build());
 
 
@@ -147,4 +156,33 @@ public class TestRouter {
 
 
     router.close();
     router.close();
   }
   }
+
+  @Test
+  public void testRouterRpcWithNoSubclusters() throws IOException {
+
+    Router router = new Router();
+    router.init(new RouterConfigBuilder(conf).rpc().build());
+    router.start();
+
+    InetSocketAddress serverAddress = router.getRpcServerAddress();
+    DFSClient dfsClient = new DFSClient(serverAddress, conf);
+
+    try {
+      dfsClient.create("/test.txt", false);
+      fail("Create with no subclusters should fail");
+    } catch (RemoteException e) {
+      assertExceptionContains("Cannot find locations for /test.txt", e);
+    }
+
+    try {
+      dfsClient.datanodeReport(DatanodeReportType.LIVE);
+      fail("Get datanode reports with no subclusters should fail");
+    } catch (IOException e) {
+      assertExceptionContains("No remote locations available", e);
+    }
+
+    dfsClient.close();
+    router.stop();
+    router.close();
+  }
 }
 }

+ 20 - 1
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java

@@ -79,6 +79,7 @@ import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
 import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
 import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
 import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
 import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
 import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
+import org.apache.hadoop.hdfs.server.federation.MockResolver;
 import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
 import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
 import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
 import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
 import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
@@ -1155,7 +1156,25 @@ public class TestRouterRpc {
     }, 500, 5 * 1000);
     }, 500, 5 * 1000);
 
 
     // The cache should be updated now
     // The cache should be updated now
-    assertNotEquals(jsonString0, metrics.getLiveNodes());
+    final String jsonString2 = metrics.getLiveNodes();
+    assertNotEquals(jsonString0, jsonString2);
+
+
+    // Without any subcluster available, we should return an empty list
+    MockResolver resolver =
+        (MockResolver) router.getRouter().getNamenodeResolver();
+    resolver.cleanRegistrations();
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return !jsonString2.equals(metrics.getLiveNodes());
+      }
+    }, 500, 5 * 1000);
+    assertEquals("{}", metrics.getLiveNodes());
+
+    // Reset the registrations again
+    cluster.registerNamenodes();
+    cluster.waitNamenodeRegistration();
   }
   }
 
 
   /**
   /**