|
@@ -37,6 +37,7 @@ import static org.junit.Assert.fail;
|
|
|
import java.io.IOException;
|
|
|
import java.lang.reflect.Method;
|
|
|
import java.net.URISyntaxException;
|
|
|
+import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.Comparator;
|
|
|
import java.util.EnumSet;
|
|
@@ -47,6 +48,7 @@ import java.util.Map;
|
|
|
import java.util.Map.Entry;
|
|
|
import java.util.Random;
|
|
|
import java.util.Set;
|
|
|
+import java.util.TreeMap;
|
|
|
import java.util.TreeSet;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
@@ -120,6 +122,11 @@ public class TestRouterRpc {
|
|
|
private static final Logger LOG =
|
|
|
LoggerFactory.getLogger(TestRouterRpc.class);
|
|
|
|
|
|
+ private static final int NUM_SUBCLUSTERS = 2;
|
|
|
+ // We need at least 6 DNs to test Erasure Coding with RS-6-3-64k
|
|
|
+ private static final int NUM_DNS = 6;
|
|
|
+
|
|
|
+
|
|
|
private static final Comparator<ErasureCodingPolicyInfo> EC_POLICY_CMP =
|
|
|
new Comparator<ErasureCodingPolicyInfo>() {
|
|
|
public int compare(
|
|
@@ -165,9 +172,9 @@ public class TestRouterRpc {
|
|
|
|
|
|
@BeforeClass
|
|
|
public static void globalSetUp() throws Exception {
|
|
|
- cluster = new MiniRouterDFSCluster(false, 2);
|
|
|
- // We need 6 DNs to test Erasure Coding with RS-6-3-64k
|
|
|
- cluster.setNumDatanodesPerNameservice(6);
|
|
|
+ cluster = new MiniRouterDFSCluster(false, NUM_SUBCLUSTERS);
|
|
|
+ cluster.setNumDatanodesPerNameservice(NUM_DNS);
|
|
|
+ cluster.setIndependentDNs();
|
|
|
|
|
|
// Start NNs and DNs and wait until ready
|
|
|
cluster.startCluster();
|
|
@@ -586,8 +593,13 @@ public class TestRouterRpc {
|
|
|
|
|
|
DatanodeInfo[] combinedData =
|
|
|
routerProtocol.getDatanodeReport(DatanodeReportType.ALL);
|
|
|
+ final Map<Integer, String> routerDNMap = new TreeMap<>();
|
|
|
+ for (DatanodeInfo dn : combinedData) {
|
|
|
+ String subcluster = dn.getNetworkLocation().split("/")[1];
|
|
|
+ routerDNMap.put(dn.getXferPort(), subcluster);
|
|
|
+ }
|
|
|
|
|
|
- Set<Integer> individualData = new HashSet<Integer>();
|
|
|
+ final Map<Integer, String> nnDNMap = new TreeMap<>();
|
|
|
for (String nameservice : cluster.getNameservices()) {
|
|
|
NamenodeContext n = cluster.getNamenode(nameservice, null);
|
|
|
DFSClient client = n.getClient();
|
|
@@ -597,10 +609,10 @@ public class TestRouterRpc {
|
|
|
for (int i = 0; i < data.length; i++) {
|
|
|
// Collect unique DNs based on their xfer port
|
|
|
DatanodeInfo info = data[i];
|
|
|
- individualData.add(info.getXferPort());
|
|
|
+ nnDNMap.put(info.getXferPort(), nameservice);
|
|
|
}
|
|
|
}
|
|
|
- assertEquals(combinedData.length, individualData.size());
|
|
|
+ assertEquals(nnDNMap, routerDNMap);
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -1234,7 +1246,7 @@ public class TestRouterRpc {
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
- public void testErasureCoding() throws IOException {
|
|
|
+ public void testErasureCoding() throws Exception {
|
|
|
|
|
|
LOG.info("List the available erasurce coding policies");
|
|
|
ErasureCodingPolicyInfo[] policies = checkErasureCodingPolicies();
|
|
@@ -1340,8 +1352,22 @@ public class TestRouterRpc {
|
|
|
|
|
|
LOG.info("Check the stats");
|
|
|
ECBlockGroupStats statsRouter = routerProtocol.getECBlockGroupStats();
|
|
|
- ECBlockGroupStats statsNamenode = nnProtocol.getECBlockGroupStats();
|
|
|
- assertEquals(statsNamenode.toString(), statsRouter.toString());
|
|
|
+ ECBlockGroupStats statsNamenode = getNamenodeECBlockGroupStats();
|
|
|
+ assertEquals(statsNamenode, statsRouter);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the EC stats from all namenodes and aggregate them.
|
|
|
+ * @return Aggregated EC stats from all namenodes.
|
|
|
+ * @throws Exception If we cannot get the stats.
|
|
|
+ */
|
|
|
+ private ECBlockGroupStats getNamenodeECBlockGroupStats() throws Exception {
|
|
|
+ List<ECBlockGroupStats> nnStats = new ArrayList<>();
|
|
|
+ for (NamenodeContext nnContext : cluster.getNamenodes()) {
|
|
|
+ ClientProtocol cp = nnContext.getClient().getNamenode();
|
|
|
+ nnStats.add(cp.getECBlockGroupStats());
|
|
|
+ }
|
|
|
+ return ECBlockGroupStats.merge(nnStats);
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -1375,9 +1401,9 @@ public class TestRouterRpc {
|
|
|
router.getRouter().getNamenodeMetrics();
|
|
|
final String jsonString0 = metrics.getLiveNodes();
|
|
|
|
|
|
- // We should have 12 nodes in total
|
|
|
+ // We should have the nodes in all the subclusters
|
|
|
JSONObject jsonObject = new JSONObject(jsonString0);
|
|
|
- assertEquals(12, jsonObject.names().length());
|
|
|
+ assertEquals(NUM_SUBCLUSTERS * NUM_DNS, jsonObject.names().length());
|
|
|
|
|
|
// We should be caching this information
|
|
|
String jsonString1 = metrics.getLiveNodes();
|