|
@@ -20,19 +20,25 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
|
|
|
import java.io.Closeable;
|
|
|
import java.io.IOException;
|
|
|
import java.lang.reflect.InvocationHandler;
|
|
|
+import java.lang.reflect.InvocationTargetException;
|
|
|
import java.lang.reflect.Method;
|
|
|
import java.lang.reflect.Proxy;
|
|
|
import java.net.URI;
|
|
|
-import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
|
|
|
import java.util.function.Supplier;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.permission.FsAction;
|
|
|
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
|
|
import org.apache.hadoop.hdfs.ClientGSIContext;
|
|
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
|
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
|
+import org.apache.hadoop.io.retry.AtMostOnce;
|
|
|
+import org.apache.hadoop.io.retry.Idempotent;
|
|
|
+import org.apache.hadoop.io.retry.RetryPolicies;
|
|
|
+import org.apache.hadoop.io.retry.RetryPolicy;
|
|
|
+import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
|
|
|
import org.apache.hadoop.ipc.AlignmentContext;
|
|
|
import org.apache.hadoop.ipc.RPC;
|
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
@@ -60,16 +66,18 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
|
|
private static final Logger LOG = LoggerFactory.getLogger(
|
|
|
ObserverReadProxyProvider.class);
|
|
|
|
|
|
- /** Client-side context for syncing with the NameNode server side */
|
|
|
- private AlignmentContext alignmentContext;
|
|
|
+ /** Client-side context for syncing with the NameNode server side. */
|
|
|
+ private final AlignmentContext alignmentContext;
|
|
|
|
|
|
- private AbstractNNFailoverProxyProvider<T> failoverProxy;
|
|
|
- /** All NameNdoe proxies */
|
|
|
- private List<NNProxyInfo<T>> nameNodeProxies =
|
|
|
- new ArrayList<NNProxyInfo<T>>();
|
|
|
- /** Proxies for the observer namenodes */
|
|
|
- private final List<NNProxyInfo<T>> observerProxies =
|
|
|
- new ArrayList<NNProxyInfo<T>>();
|
|
|
+ /** The inner proxy provider used for active/standby failover. */
|
|
|
+ private final AbstractNNFailoverProxyProvider<T> failoverProxy;
|
|
|
+ /** List of all NameNode proxies. */
|
|
|
+ private final List<NNProxyInfo<T>> nameNodeProxies;
|
|
|
+
|
|
|
+ /** The policy used to determine if an exception is fatal or retriable. */
|
|
|
+ private final RetryPolicy observerRetryPolicy;
|
|
|
+ /** The combined proxy which redirects to other proxies as necessary. */
|
|
|
+ private final ProxyInfo<T> combinedProxy;
|
|
|
|
|
|
/**
|
|
|
* Whether reading from observer is enabled. If this is false, all read
|
|
@@ -78,17 +86,18 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
|
|
private boolean observerReadEnabled;
|
|
|
|
|
|
/**
|
|
|
- * Thread-local index to record the current index in the observer list.
|
|
|
+ * The index into the nameNodeProxies list currently being used. Should only
|
|
|
+ * be accessed in synchronized methods.
|
|
|
*/
|
|
|
- private static final ThreadLocal<Integer> currentIndex =
|
|
|
- ThreadLocal.withInitial(new Supplier<Integer>() {
|
|
|
- @Override
|
|
|
- public Integer get() {
|
|
|
- return 0;
|
|
|
- }
|
|
|
- });
|
|
|
+ private int currentIndex = -1;
|
|
|
|
|
|
- /** The last proxy that has been used. Only used for testing */
|
|
|
+ /**
|
|
|
+ * The proxy being used currently. Should only be accessed in synchronized
|
|
|
+ * methods.
|
|
|
+ */
|
|
|
+ private NNProxyInfo<T> currentProxy;
|
|
|
+
|
|
|
+ /** The last proxy that has been used. Only used for testing. */
|
|
|
private volatile ProxyInfo<T> lastProxy = null;
|
|
|
|
|
|
/**
|
|
@@ -96,63 +105,53 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
|
|
* {@link ConfiguredFailoverProxyProvider} for failover.
|
|
|
*/
|
|
|
public ObserverReadProxyProvider(
|
|
|
- Configuration conf, URI uri, Class<T> xface, HAProxyFactory<T> factory)
|
|
|
- throws IOException {
|
|
|
+ Configuration conf, URI uri, Class<T> xface, HAProxyFactory<T> factory) {
|
|
|
this(conf, uri, xface, factory,
|
|
|
- new ConfiguredFailoverProxyProvider<T>(conf, uri, xface,factory));
|
|
|
+ new ConfiguredFailoverProxyProvider<>(conf, uri, xface,factory));
|
|
|
}
|
|
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
public ObserverReadProxyProvider(
|
|
|
Configuration conf, URI uri, Class<T> xface, HAProxyFactory<T> factory,
|
|
|
- AbstractNNFailoverProxyProvider<T> failoverProxy)
|
|
|
- throws IOException {
|
|
|
+ AbstractNNFailoverProxyProvider<T> failoverProxy) {
|
|
|
super(conf, uri, xface, factory);
|
|
|
this.failoverProxy = failoverProxy;
|
|
|
this.alignmentContext = new ClientGSIContext();
|
|
|
((ClientHAProxyFactory<T>) factory).setAlignmentContext(alignmentContext);
|
|
|
|
|
|
+ // Don't bother configuring the number of retries and such on the retry
|
|
|
+ // policy since it is mainly only used for determining whether or not an
|
|
|
+ // exception is retriable or fatal
|
|
|
+ observerRetryPolicy = RetryPolicies.failoverOnNetworkException(
|
|
|
+ RetryPolicies.TRY_ONCE_THEN_FAIL, 1);
|
|
|
+
|
|
|
// Get all NameNode proxies
|
|
|
nameNodeProxies = getProxyAddresses(uri,
|
|
|
HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
|
|
|
- // Find out all the observer proxies
|
|
|
- for (NNProxyInfo<T> pi : nameNodeProxies) {
|
|
|
- createProxyIfNeeded(pi);
|
|
|
- if (isObserverState(pi)) {
|
|
|
- observerProxies.add(pi);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // TODO: No observers is not an error
|
|
|
- // Just direct all reads go to the active NameNode
|
|
|
- if (observerProxies.isEmpty()) {
|
|
|
- throw new RuntimeException("Couldn't find any namenode proxy in " +
|
|
|
- "OBSERVER state");
|
|
|
- }
|
|
|
- }
|
|
|
|
|
|
- public synchronized AlignmentContext getAlignmentContext() {
|
|
|
- return alignmentContext;
|
|
|
- }
|
|
|
-
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- @Override
|
|
|
- public synchronized ProxyInfo<T> getProxy() {
|
|
|
- // We just create a wrapped proxy containing all the proxies
|
|
|
+ // Create a wrapped proxy containing all the proxies. Since this combined
|
|
|
+ // proxy is just redirecting to other proxies, all invocations can share it.
|
|
|
StringBuilder combinedInfo = new StringBuilder("[");
|
|
|
-
|
|
|
- for (int i = 0; i < this.observerProxies.size(); i++) {
|
|
|
+ for (int i = 0; i < nameNodeProxies.size(); i++) {
|
|
|
if (i > 0) {
|
|
|
combinedInfo.append(",");
|
|
|
}
|
|
|
- combinedInfo.append(observerProxies.get(i).proxyInfo);
|
|
|
+ combinedInfo.append(nameNodeProxies.get(i).proxyInfo);
|
|
|
}
|
|
|
-
|
|
|
combinedInfo.append(']');
|
|
|
T wrappedProxy = (T) Proxy.newProxyInstance(
|
|
|
ObserverReadInvocationHandler.class.getClassLoader(),
|
|
|
- new Class<?>[]{xface},
|
|
|
- new ObserverReadInvocationHandler(observerProxies));
|
|
|
- return new ProxyInfo<>(wrappedProxy, combinedInfo.toString());
|
|
|
+ new Class<?>[] { xface }, new ObserverReadInvocationHandler());
|
|
|
+ combinedProxy = new ProxyInfo<>(wrappedProxy, combinedInfo.toString());
|
|
|
+ }
|
|
|
+
|
|
|
+ public AlignmentContext getAlignmentContext() {
|
|
|
+ return alignmentContext;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public ProxyInfo<T> getProxy() {
|
|
|
+ return combinedProxy;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -165,8 +164,11 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
|
|
*
|
|
|
* @return whether the 'method' is a read-only operation.
|
|
|
*/
|
|
|
- private boolean isRead(Method method) {
|
|
|
- return method.isAnnotationPresent(ReadOnly.class);
|
|
|
+ private static boolean isRead(Method method) {
|
|
|
+ if (!method.isAnnotationPresent(ReadOnly.class)) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return !method.getAnnotation(ReadOnly.class).activeOnly();
|
|
|
}
|
|
|
|
|
|
@VisibleForTesting
|
|
@@ -174,21 +176,13 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
|
|
this.observerReadEnabled = flag;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * After getting exception 'ex', whether we should retry the current request
|
|
|
- * on a different observer.
|
|
|
- */
|
|
|
- private boolean shouldRetry(Exception ex) throws Exception {
|
|
|
- // TODO: implement retry policy
|
|
|
- return true;
|
|
|
- }
|
|
|
-
|
|
|
@VisibleForTesting
|
|
|
ProxyInfo<T> getLastProxy() {
|
|
|
return lastProxy;
|
|
|
}
|
|
|
|
|
|
- boolean isObserverState(NNProxyInfo<T> pi) {
|
|
|
+ private static <T extends ClientProtocol> HAServiceState getServiceState(
|
|
|
+ NNProxyInfo<T> pi) {
|
|
|
// TODO: should introduce new ClientProtocol method to verify the
|
|
|
// underlying service state, which does not require superuser access
|
|
|
// The is a workaround
|
|
@@ -196,7 +190,7 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
|
|
try {
|
|
|
// Verify write access first
|
|
|
pi.proxy.reportBadBlocks(new LocatedBlock[0]);
|
|
|
- return false; // Only active NameNode allows write
|
|
|
+ return HAServiceState.ACTIVE; // Only active NameNode allows write
|
|
|
} catch (RemoteException re) {
|
|
|
IOException sbe = re.unwrapRemoteException(StandbyException.class);
|
|
|
if (!(sbe instanceof StandbyException)) {
|
|
@@ -206,15 +200,16 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
|
|
ioe = e;
|
|
|
}
|
|
|
if (ioe != null) {
|
|
|
- LOG.error("Failed to connect to {}", pi.getAddress(), ioe);
|
|
|
- return false;
|
|
|
+ LOG.warn("Failed to connect to {}", pi.getAddress(), ioe);
|
|
|
+ return HAServiceState.STANDBY; // Just assume standby in this case
|
|
|
+ // Anything besides observer is fine
|
|
|
}
|
|
|
// Verify read access
|
|
|
// For now we assume only Observer nodes allow reads
|
|
|
// Stale reads on StandbyNode should be turned off
|
|
|
try {
|
|
|
pi.proxy.checkAccess("/", FsAction.READ);
|
|
|
- return true;
|
|
|
+ return HAServiceState.OBSERVER;
|
|
|
} catch (RemoteException re) {
|
|
|
IOException sbe = re.unwrapRemoteException(StandbyException.class);
|
|
|
if (!(sbe instanceof StandbyException)) {
|
|
@@ -224,29 +219,60 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
|
|
ioe = e;
|
|
|
}
|
|
|
if (ioe != null) {
|
|
|
- LOG.error("Failed to connect to {}", pi.getAddress(), ioe);
|
|
|
+ LOG.warn("Failed to connect to {}", pi.getAddress(), ioe);
|
|
|
}
|
|
|
- return false;
|
|
|
+ return HAServiceState.STANDBY;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Return the currently used proxy. If there is none, first calls
|
|
|
+ * {@link #changeProxy(NNProxyInfo)} to initialize one.
|
|
|
+ */
|
|
|
+ private NNProxyInfo<T> getCurrentProxy() {
|
|
|
+ if (currentProxy == null) {
|
|
|
+ changeProxy(null);
|
|
|
+ }
|
|
|
+ return currentProxy;
|
|
|
+ }
|
|
|
|
|
|
- class ObserverReadInvocationHandler implements InvocationHandler {
|
|
|
- final List<NNProxyInfo<T>> observerProxies;
|
|
|
- final ProxyInfo<T> activeProxy;
|
|
|
-
|
|
|
- ObserverReadInvocationHandler(List<NNProxyInfo<T>> observerProxies) {
|
|
|
- this.observerProxies = observerProxies;
|
|
|
- this.activeProxy = failoverProxy.getProxy();
|
|
|
+ /**
|
|
|
+ * Move to the next proxy in the proxy list. If the NNProxyInfo supplied by
|
|
|
+ * the caller does not match the current proxy, the call is ignored; this is
|
|
|
+ * to handle concurrent calls (to avoid changing the proxy multiple times).
|
|
|
+ * The service state of the newly selected proxy will be updated before
|
|
|
+ * returning.
|
|
|
+ *
|
|
|
+ * @param initial The expected current proxy
|
|
|
+ */
|
|
|
+ private synchronized void changeProxy(NNProxyInfo<T> initial) {
|
|
|
+ if (currentProxy != initial) {
|
|
|
+ // Must have been a concurrent modification; ignore the move request
|
|
|
+ return;
|
|
|
}
|
|
|
+ // Attempt to force concurrent callers of getCurrentProxy to wait for the
|
|
|
+ // new proxy; best-effort by setting currentProxy to null
|
|
|
+ currentProxy = null;
|
|
|
+ currentIndex = (currentIndex + 1) % nameNodeProxies.size();
|
|
|
+ currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex));
|
|
|
+ currentProxy.setCachedState(getServiceState(currentProxy));
|
|
|
+ LOG.debug("Changed current proxy from {} to {}",
|
|
|
+ initial == null ? "none" : initial.proxyInfo,
|
|
|
+ currentProxy.proxyInfo);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * An InvocationHandler to handle incoming requests. This class's invoke
|
|
|
+ * method contains the primary logic for redirecting to observers.
|
|
|
+ *
|
|
|
+ * If observer reads are enabled, attempt to send read operations to the
|
|
|
+ * current proxy. If it is not an observer, or the observer fails, adjust
|
|
|
+ * the current proxy and retry on the next one. If all proxies are tried
|
|
|
+ * without success, the request is forwarded to the active.
|
|
|
+ *
|
|
|
+ * Write requests are always forwarded to the active.
|
|
|
+ */
|
|
|
+ private class ObserverReadInvocationHandler implements InvocationHandler {
|
|
|
|
|
|
- /**
|
|
|
- * Sends read operations to the observer (if enabled) specified by the
|
|
|
- * current index, and send write operations to the active. If a observer
|
|
|
- * fails, we increment the index and retry the next one. If all observers
|
|
|
- * fail, the request is forwarded to the active.
|
|
|
- *
|
|
|
- * Write requests are always forwarded to the active.
|
|
|
- */
|
|
|
@Override
|
|
|
public Object invoke(Object proxy, final Method method, final Object[] args)
|
|
|
throws Throwable {
|
|
@@ -254,33 +280,65 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
|
|
Object retVal;
|
|
|
|
|
|
if (observerReadEnabled && isRead(method)) {
|
|
|
- // Loop through all the proxies, starting from the current index.
|
|
|
- for (int i = 0; i < observerProxies.size(); i++) {
|
|
|
- NNProxyInfo<T> current = observerProxies.get(currentIndex.get());
|
|
|
+ int failedObserverCount = 0;
|
|
|
+ int activeCount = 0;
|
|
|
+ int standbyCount = 0;
|
|
|
+ for (int i = 0; i < nameNodeProxies.size(); i++) {
|
|
|
+ NNProxyInfo<T> current = getCurrentProxy();
|
|
|
+ HAServiceState currState = current.getCachedState();
|
|
|
+ if (currState != HAServiceState.OBSERVER) {
|
|
|
+ if (currState == HAServiceState.ACTIVE) {
|
|
|
+ activeCount++;
|
|
|
+ } else if (currState == HAServiceState.STANDBY) {
|
|
|
+ standbyCount++;
|
|
|
+ }
|
|
|
+ LOG.debug("Skipping proxy {} for {} because it is in state {}",
|
|
|
+ current.proxyInfo, method.getName(), currState);
|
|
|
+ changeProxy(current);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ LOG.debug("Attempting to service {} using proxy {}",
|
|
|
+ method.getName(), current.proxyInfo);
|
|
|
try {
|
|
|
retVal = method.invoke(current.proxy, args);
|
|
|
lastProxy = current;
|
|
|
+ LOG.debug("Invocation of {} using {} was successful",
|
|
|
+ method.getName(), current.proxyInfo);
|
|
|
return retVal;
|
|
|
- } catch (Exception e) {
|
|
|
- if (!shouldRetry(e)) {
|
|
|
+ } catch (InvocationTargetException ite) {
|
|
|
+ if (!(ite.getCause() instanceof Exception)) {
|
|
|
+ throw ite.getCause();
|
|
|
+ }
|
|
|
+ Exception e = (Exception) ite.getCause();
|
|
|
+ RetryAction retryInfo = observerRetryPolicy.shouldRetry(e, 0, 0,
|
|
|
+ method.isAnnotationPresent(Idempotent.class)
|
|
|
+ || method.isAnnotationPresent(AtMostOnce.class));
|
|
|
+ if (retryInfo.action == RetryAction.RetryDecision.FAIL) {
|
|
|
throw e;
|
|
|
+ } else {
|
|
|
+ failedObserverCount++;
|
|
|
+ LOG.warn(
|
|
|
+ "Invocation returned exception on [{}]; {} failure(s) so far",
|
|
|
+ current.proxyInfo, failedObserverCount, e);
|
|
|
+ changeProxy(current);
|
|
|
}
|
|
|
- currentIndex.set((currentIndex.get() + 1) % observerProxies.size());
|
|
|
- LOG.warn("Invocation returned exception on [{}]",
|
|
|
- current.proxyInfo, e.getCause());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// If we get here, it means all observers have failed.
|
|
|
- LOG.warn("All observers have failed for read request {}. " +
|
|
|
- "Fall back on active: {}", method.getName(), activeProxy);
|
|
|
+ LOG.warn("{} observers have failed for read request {}; also found " +
|
|
|
+ "{} standby and {} active. Falling back to active.",
|
|
|
+ failedObserverCount, standbyCount, activeCount, method.getName());
|
|
|
}
|
|
|
|
|
|
// Either all observers have failed, or that it is a write request.
|
|
|
// In either case, we'll forward the request to active NameNode.
|
|
|
+ LOG.debug("Using failoverProxy to service {}", method.getName());
|
|
|
+ ProxyInfo<T> activeProxy = failoverProxy.getProxy();
|
|
|
try {
|
|
|
retVal = method.invoke(activeProxy.proxy, args);
|
|
|
- } catch (Exception e) {
|
|
|
+ } catch (InvocationTargetException e) {
|
|
|
+ // This exception will be handled by higher layers
|
|
|
throw e.getCause();
|
|
|
}
|
|
|
lastProxy = activeProxy;
|
|
@@ -290,7 +348,6 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
|
|
|
|
|
@Override
|
|
|
public synchronized void close() throws IOException {
|
|
|
- failoverProxy.close();
|
|
|
for (ProxyInfo<T> pi : nameNodeProxies) {
|
|
|
if (pi.proxy != null) {
|
|
|
if (pi.proxy instanceof Closeable) {
|
|
@@ -298,8 +355,12 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
|
|
} else {
|
|
|
RPC.stopProxy(pi.proxy);
|
|
|
}
|
|
|
+ // Set to null to avoid the failoverProxy having to re-do the close
|
|
|
+ // if it is sharing a proxy instance
|
|
|
+ pi.proxy = null;
|
|
|
}
|
|
|
}
|
|
|
+ failoverProxy.close();
|
|
|
}
|
|
|
|
|
|
@Override
|