|
@@ -22,10 +22,13 @@ import java.io.IOException;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.net.URI;
|
|
|
import java.util.HashMap;
|
|
|
+import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.permission.FsAction;
|
|
|
+import org.apache.hadoop.ha.HAServiceProtocol;
|
|
|
+import org.apache.hadoop.ha.HAServiceStatus;
|
|
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
|
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
@@ -38,10 +41,12 @@ import org.mockito.Mockito;
|
|
|
import org.mockito.invocation.InvocationOnMock;
|
|
|
import org.mockito.stubbing.Answer;
|
|
|
|
|
|
+import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
|
|
+
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
import static org.junit.Assert.fail;
|
|
|
import static org.mockito.Mockito.mock;
|
|
|
-
|
|
|
+import static org.mockito.Mockito.when;
|
|
|
|
|
|
/**
|
|
|
* Tests for {@link ObserverReadProxyProvider} under various configurations of
|
|
@@ -56,7 +61,7 @@ public class TestObserverReadProxyProvider {
|
|
|
private Configuration conf;
|
|
|
|
|
|
private ObserverReadProxyProvider<ClientProtocol> proxyProvider;
|
|
|
- private ClientProtocolAnswer[] namenodeAnswers;
|
|
|
+ private NameNodeAnswer[] namenodeAnswers;
|
|
|
private String[] namenodeAddrs;
|
|
|
|
|
|
@Before
|
|
@@ -70,32 +75,53 @@ public class TestObserverReadProxyProvider {
|
|
|
private void setupProxyProvider(int namenodeCount) throws Exception {
|
|
|
String[] namenodeIDs = new String[namenodeCount];
|
|
|
namenodeAddrs = new String[namenodeCount];
|
|
|
- namenodeAnswers = new ClientProtocolAnswer[namenodeCount];
|
|
|
+ namenodeAnswers = new NameNodeAnswer[namenodeCount];
|
|
|
ClientProtocol[] proxies = new ClientProtocol[namenodeCount];
|
|
|
Map<String, ClientProtocol> proxyMap = new HashMap<>();
|
|
|
+ HAServiceProtocol[] serviceProxies = new HAServiceProtocol[namenodeCount];
|
|
|
+ Map<String, HAServiceProtocol> serviceProxyMap = new HashMap<>();
|
|
|
for (int i = 0; i < namenodeCount; i++) {
|
|
|
namenodeIDs[i] = "nn" + i;
|
|
|
namenodeAddrs[i] = "namenode" + i + ".test:8020";
|
|
|
conf.set(HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns +
|
|
|
"." + namenodeIDs[i], namenodeAddrs[i]);
|
|
|
- namenodeAnswers[i] = new ClientProtocolAnswer();
|
|
|
+ namenodeAnswers[i] = new NameNodeAnswer();
|
|
|
proxies[i] = mock(ClientProtocol.class);
|
|
|
- doWrite(Mockito.doAnswer(namenodeAnswers[i]).when(proxies[i]));
|
|
|
- doRead(Mockito.doAnswer(namenodeAnswers[i]).when(proxies[i]));
|
|
|
+ doWrite(Mockito.doAnswer(namenodeAnswers[i].clientAnswer)
|
|
|
+ .when(proxies[i]));
|
|
|
+ doRead(Mockito.doAnswer(namenodeAnswers[i].clientAnswer)
|
|
|
+ .when(proxies[i]));
|
|
|
+ serviceProxies[i] = mock(HAServiceProtocol.class);
|
|
|
+ Mockito.doAnswer(namenodeAnswers[i].serviceAnswer)
|
|
|
+ .when(serviceProxies[i]).getServiceStatus();
|
|
|
proxyMap.put(namenodeAddrs[i], proxies[i]);
|
|
|
+ serviceProxyMap.put(namenodeAddrs[i], serviceProxies[i]);
|
|
|
}
|
|
|
conf.set(HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns,
|
|
|
Joiner.on(",").join(namenodeIDs));
|
|
|
- proxyProvider = new ObserverReadProxyProvider<>(conf, nnURI,
|
|
|
- ClientProtocol.class, new ClientHAProxyFactory<ClientProtocol>() {
|
|
|
+ proxyProvider = new ObserverReadProxyProvider<ClientProtocol>(conf, nnURI,
|
|
|
+ ClientProtocol.class,
|
|
|
+ new ClientHAProxyFactory<ClientProtocol>() {
|
|
|
+ @Override
|
|
|
+ public ClientProtocol createProxy(Configuration config,
|
|
|
+ InetSocketAddress nnAddr, Class<ClientProtocol> xface,
|
|
|
+ UserGroupInformation ugi, boolean withRetries,
|
|
|
+ AtomicBoolean fallbackToSimpleAuth) {
|
|
|
+ return proxyMap.get(nnAddr.toString());
|
|
|
+ }
|
|
|
+ }) {
|
|
|
@Override
|
|
|
- public ClientProtocol createProxy(Configuration conf,
|
|
|
- InetSocketAddress nnAddr, Class<ClientProtocol> xface,
|
|
|
- UserGroupInformation ugi, boolean withRetries,
|
|
|
- AtomicBoolean fallbackToSimpleAuth) {
|
|
|
- return proxyMap.get(nnAddr.toString());
|
|
|
+ protected List<NNProxyInfo<ClientProtocol>> getProxyAddresses(
|
|
|
+ URI uri, String addressKey) {
|
|
|
+ List<NNProxyInfo<ClientProtocol>> nnProxies =
|
|
|
+ super.getProxyAddresses(uri, addressKey);
|
|
|
+ for (NNProxyInfo<ClientProtocol> nnProxy : nnProxies) {
|
|
|
+ String addressStr = nnProxy.getAddress().toString();
|
|
|
+ nnProxy.setServiceProxyForTesting(serviceProxyMap.get(addressStr));
|
|
|
+ }
|
|
|
+ return nnProxies;
|
|
|
}
|
|
|
- });
|
|
|
+ };
|
|
|
proxyProvider.setObserverReadEnabled(true);
|
|
|
}
|
|
|
|
|
@@ -275,39 +301,62 @@ public class TestObserverReadProxyProvider {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * An {@link Answer} used for mocking of a {@link ClientProtocol}. Setting
|
|
|
- * the state or unreachability of this Answer will make the linked
|
|
|
- * ClientProtocol respond as if it was communicating with a NameNode of
|
|
|
- * the corresponding state. It is in Standby state by default.
|
|
|
+ * An {@link Answer} used for mocking of {@link ClientProtocol} and
|
|
|
+ * {@link HAServiceProtocol}. Setting the state or unreachability of this
|
|
|
+ * Answer will make the linked ClientProtocol respond as if it was
|
|
|
+ * communicating with a NameNode of the corresponding state. It is in Standby
|
|
|
+ * state by default.
|
|
|
*/
|
|
|
- private static class ClientProtocolAnswer implements Answer<Void> {
|
|
|
+ private static class NameNodeAnswer {
|
|
|
|
|
|
private volatile boolean unreachable = false;
|
|
|
// Standby state by default
|
|
|
private volatile boolean allowWrites = false;
|
|
|
private volatile boolean allowReads = false;
|
|
|
|
|
|
- @Override
|
|
|
- public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
|
|
|
- if (unreachable) {
|
|
|
- throw new IOException("Unavailable");
|
|
|
+ private ClientProtocolAnswer clientAnswer = new ClientProtocolAnswer();
|
|
|
+ private HAServiceProtocolAnswer serviceAnswer =
|
|
|
+ new HAServiceProtocolAnswer();
|
|
|
+
|
|
|
+ private class HAServiceProtocolAnswer implements Answer<HAServiceStatus> {
|
|
|
+ @Override
|
|
|
+ public HAServiceStatus answer(InvocationOnMock invocation)
|
|
|
+ throws Throwable {
|
|
|
+ HAServiceStatus status = mock(HAServiceStatus.class);
|
|
|
+ if (allowReads && allowWrites) {
|
|
|
+ when(status.getState()).thenReturn(HAServiceState.ACTIVE);
|
|
|
+ } else if (allowReads) {
|
|
|
+ when(status.getState()).thenReturn(HAServiceState.OBSERVER);
|
|
|
+ } else {
|
|
|
+ when(status.getState()).thenReturn(HAServiceState.STANDBY);
|
|
|
+ }
|
|
|
+ return status;
|
|
|
}
|
|
|
- switch (invocationOnMock.getMethod().getName()) {
|
|
|
+ }
|
|
|
+
|
|
|
+ private class ClientProtocolAnswer implements Answer<Void> {
|
|
|
+ @Override
|
|
|
+ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
|
|
|
+ if (unreachable) {
|
|
|
+ throw new IOException("Unavailable");
|
|
|
+ }
|
|
|
+ switch (invocationOnMock.getMethod().getName()) {
|
|
|
case "reportBadBlocks":
|
|
|
if (!allowWrites) {
|
|
|
- throw new RemoteException(StandbyException.class.getCanonicalName(),
|
|
|
- "No writes!");
|
|
|
+ throw new RemoteException(
|
|
|
+ StandbyException.class.getCanonicalName(), "No writes!");
|
|
|
}
|
|
|
return null;
|
|
|
case "checkAccess":
|
|
|
if (!allowReads) {
|
|
|
- throw new RemoteException(StandbyException.class.getCanonicalName(),
|
|
|
- "No reads!");
|
|
|
+ throw new RemoteException(
|
|
|
+ StandbyException.class.getCanonicalName(), "No reads!");
|
|
|
}
|
|
|
return null;
|
|
|
default:
|
|
|
throw new IllegalArgumentException(
|
|
|
"Only reportBadBlocks and checkAccess supported!");
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|