|
@@ -33,13 +33,17 @@ import static org.junit.Assert.assertTrue;
|
|
|
import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
+import java.util.Set;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.TimeoutException;
|
|
|
|
|
|
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
|
|
|
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
|
|
|
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
|
|
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
|
|
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
|
|
|
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest;
|
|
|
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoResponse;
|
|
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatRequest;
|
|
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatResponse;
|
|
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationRequest;
|
|
@@ -473,6 +477,56 @@ public class TestStateStoreMembershipState extends TestStateStoreBase {
|
|
|
}, 100, 3000);
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testNamespaceInfoWithUnavailableNameNodeRegistration()
|
|
|
+ throws IOException {
|
|
|
+ // Populate the state store with one ACTIVE NameNode entry
|
|
|
+ // and one UNAVAILABLE NameNode entry
|
|
|
+ // 1) ns0:nn0 - ACTIVE
|
|
|
+ // 2) ns0:nn1 - UNAVAILABLE
|
|
|
+ List<MembershipState> registrationList = new ArrayList<>();
|
|
|
+ String router = ROUTERS[0];
|
|
|
+ String ns = NAMESERVICES[0];
|
|
|
+ String rpcAddress = "testrpcaddress";
|
|
|
+ String serviceAddress = "testserviceaddress";
|
|
|
+ String lifelineAddress = "testlifelineaddress";
|
|
|
+ String blockPoolId = "testblockpool";
|
|
|
+ String clusterId = "testcluster";
|
|
|
+ String webScheme = "http";
|
|
|
+ String webAddress = "testwebaddress";
|
|
|
+ boolean safemode = false;
|
|
|
+
|
|
|
+ MembershipState record = MembershipState.newInstance(
|
|
|
+ router, ns, NAMENODES[0], clusterId, blockPoolId,
|
|
|
+ rpcAddress, serviceAddress, lifelineAddress, webScheme,
|
|
|
+ webAddress, FederationNamenodeServiceState.ACTIVE, safemode);
|
|
|
+ registrationList.add(record);
|
|
|
+
|
|
|
+ // Set empty clusterId and blockPoolId for UNAVAILABLE NameNode
|
|
|
+ record = MembershipState.newInstance(
|
|
|
+ router, ns, NAMENODES[1], "", "",
|
|
|
+ rpcAddress, serviceAddress, lifelineAddress, webScheme,
|
|
|
+ webAddress, FederationNamenodeServiceState.UNAVAILABLE, safemode);
|
|
|
+ registrationList.add(record);
|
|
|
+
|
|
|
+ registerAndLoadRegistrations(registrationList);
|
|
|
+
|
|
|
+ GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance();
|
|
|
+ GetNamespaceInfoResponse response
|
|
|
+ = membershipStore.getNamespaceInfo(request);
|
|
|
+ Set<FederationNamespaceInfo> namespaces = response.getNamespaceInfo();
|
|
|
+
|
|
|
+ // Verify only one namespace is registered
|
|
|
+ assertEquals(1, namespaces.size());
|
|
|
+
|
|
|
+ // Verify the registered namespace has a valid pair of clusterId
|
|
|
+ // and blockPoolId derived from ACTIVE NameNode
|
|
|
+ FederationNamespaceInfo namespace = namespaces.iterator().next();
|
|
|
+ assertEquals(ns, namespace.getNameserviceId());
|
|
|
+ assertEquals(clusterId, namespace.getClusterId());
|
|
|
+ assertEquals(blockPoolId, namespace.getBlockPoolId());
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Get a single namenode membership record from the store.
|
|
|
*
|