|
@@ -66,7 +66,7 @@ import com.google.common.annotations.VisibleForTesting;
|
|
|
*/
|
|
|
@InterfaceAudience.Private
|
|
|
@InterfaceStability.Evolving
|
|
|
-public class ObserverReadProxyProvider<T extends ClientProtocol>
|
|
|
+public class ObserverReadProxyProvider<T>
|
|
|
extends AbstractNNFailoverProxyProvider<T> {
|
|
|
@VisibleForTesting
|
|
|
static final Logger LOG = LoggerFactory.getLogger(
|
|
@@ -189,7 +189,13 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
|
|
AUTO_MSYNC_PERIOD_DEFAULT, TimeUnit.MILLISECONDS);
|
|
|
|
|
|
// TODO : make this configurable or remove this variable
|
|
|
- this.observerReadEnabled = true;
|
|
|
+ if (wrappedProxy instanceof ClientProtocol) {
|
|
|
+ this.observerReadEnabled = true;
|
|
|
+ } else {
|
|
|
+ LOG.info("Disabling observer reads for {} because the requested proxy "
|
|
|
+ + "class does not implement {}", uri, ClientProtocol.class.getName());
|
|
|
+ this.observerReadEnabled = false;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public AlignmentContext getAlignmentContext() {
|
|
@@ -267,7 +273,7 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
|
|
private HAServiceState getHAServiceState(NNProxyInfo<T> proxyInfo) {
|
|
|
IOException ioe;
|
|
|
try {
|
|
|
- return proxyInfo.proxy.getHAServiceState();
|
|
|
+ return getProxyAsClientProtocol(proxyInfo.proxy).getHAServiceState();
|
|
|
} catch (RemoteException re) {
|
|
|
// Though a Standby will allow a getHAServiceState call, it won't allow
|
|
|
// delegation token lookup, so if DT is used it throws StandbyException
|
|
@@ -284,7 +290,19 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
|
|
LOG.debug("Failed to connect to {} while fetching HAServiceState",
|
|
|
proxyInfo.getAddress(), ioe);
|
|
|
}
|
|
|
- return HAServiceState.STANDBY;
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Return the input proxy, cast as a {@link ClientProtocol}. This catches any
|
|
|
+ * {@link ClassCastException} and wraps it in a more helpful message. This
|
|
|
+ * should ONLY be called if the caller is certain that the proxy is, in fact,
|
|
|
+ * a {@link ClientProtocol}.
|
|
|
+ */
|
|
|
+ private ClientProtocol getProxyAsClientProtocol(T proxy) {
|
|
|
+ assert proxy instanceof ClientProtocol : "BUG: Attempted to use proxy "
|
|
|
+ + "of class " + proxy.getClass() + " as if it was a ClientProtocol.";
|
|
|
+ return (ClientProtocol) proxy;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -299,7 +317,7 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
|
|
if (msynced) {
|
|
|
return; // No need for an msync
|
|
|
}
|
|
|
- failoverProxy.getProxy().proxy.msync();
|
|
|
+ getProxyAsClientProtocol(failoverProxy.getProxy().proxy).msync();
|
|
|
msynced = true;
|
|
|
lastMsyncTimeMs = Time.monotonicNow();
|
|
|
}
|
|
@@ -315,7 +333,7 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
|
|
private void autoMsyncIfNecessary() throws IOException {
|
|
|
if (autoMsyncPeriodMs == 0) {
|
|
|
// Always msync
|
|
|
- failoverProxy.getProxy().proxy.msync();
|
|
|
+ getProxyAsClientProtocol(failoverProxy.getProxy().proxy).msync();
|
|
|
} else if (autoMsyncPeriodMs > 0) {
|
|
|
if (Time.monotonicNow() - lastMsyncTimeMs > autoMsyncPeriodMs) {
|
|
|
synchronized (this) {
|
|
@@ -324,7 +342,7 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
|
|
// Re-check the entry criterion since the status may have changed
|
|
|
// while waiting for the lock.
|
|
|
if (Time.monotonicNow() - lastMsyncTimeMs > autoMsyncPeriodMs) {
|
|
|
- failoverProxy.getProxy().proxy.msync();
|
|
|
+ getProxyAsClientProtocol(failoverProxy.getProxy().proxy).msync();
|
|
|
lastMsyncTimeMs = Time.monotonicNow();
|
|
|
}
|
|
|
}
|
|
@@ -363,6 +381,7 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
|
|
int failedObserverCount = 0;
|
|
|
int activeCount = 0;
|
|
|
int standbyCount = 0;
|
|
|
+ int unreachableCount = 0;
|
|
|
for (int i = 0; i < nameNodeProxies.size(); i++) {
|
|
|
NNProxyInfo<T> current = getCurrentProxy();
|
|
|
HAServiceState currState = current.getCachedState();
|
|
@@ -371,9 +390,12 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
|
|
activeCount++;
|
|
|
} else if (currState == HAServiceState.STANDBY) {
|
|
|
standbyCount++;
|
|
|
+ } else if (currState == null) {
|
|
|
+ unreachableCount++;
|
|
|
}
|
|
|
LOG.debug("Skipping proxy {} for {} because it is in state {}",
|
|
|
- current.proxyInfo, method.getName(), currState);
|
|
|
+ current.proxyInfo, method.getName(),
|
|
|
+ currState == null ? "unreachable" : currState);
|
|
|
changeProxy(current);
|
|
|
continue;
|
|
|
}
|
|
@@ -420,10 +442,10 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
|
|
// be that there is simply no Observer node running at all.
|
|
|
if (failedObserverCount > 0) {
|
|
|
// If we get here, it means all observers have failed.
|
|
|
- LOG.warn("{} observers have failed for read request {}; "
|
|
|
- + "also found {} standby, {} active. "
|
|
|
- + "Falling back to active.", failedObserverCount,
|
|
|
- method.getName(), standbyCount, activeCount);
|
|
|
+ LOG.warn("{} observers have failed for read request {}; also found "
|
|
|
+ + "{} standby, {} active, and {} unreachable. Falling back "
|
|
|
+ + "to active.", failedObserverCount, method.getName(),
|
|
|
+ standbyCount, activeCount, unreachableCount);
|
|
|
} else {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Read falling back to active without observer read "
|
|
@@ -432,8 +454,9 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // Either all observers have failed, or that it is a write request.
|
|
|
- // In either case, we'll forward the request to active NameNode.
|
|
|
+ // Either all observers have failed, observer reads are disabled,
|
|
|
+ // or this is a write request. In any case, forward the request to
|
|
|
+ // the active NameNode.
|
|
|
LOG.debug("Using failoverProxy to service {}", method.getName());
|
|
|
ProxyInfo<T> activeProxy = failoverProxy.getProxy();
|
|
|
try {
|
|
@@ -455,7 +478,8 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
|
|
|
|
|
|
@Override
|
|
|
public ConnectionId getConnectionId() {
|
|
|
- return RPC.getConnectionIdForProxy(getCurrentProxy().proxy);
|
|
|
+ return RPC.getConnectionIdForProxy(observerReadEnabled
|
|
|
+ ? getCurrentProxy().proxy : failoverProxy.getProxy().proxy);
|
|
|
}
|
|
|
}
|
|
|
|